You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:40 UTC

[incubator-wayang] 02/11: [WAYANG-34] add the ObjectFileSink to Platform Spark and Flink

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit a472d3a9a6ecb2f1047eeffc81c20ee225bc9f1c
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 12:47:30 2021 +0200

    [WAYANG-34] add the ObjectFileSink to Platform Spark and Flink
    
    Signed-off-by: bertty <be...@gmail.com>
---
 .../org/apache/wayang/flink/mapping/Mappings.java  |  1 +
 .../flink/mapping/ObjectFileSinkMapping.java       | 64 +++++++++++++++++++++
 .../flink/operators/FlinkObjectFileSink.java       | 27 +++++----
 .../wayang/java/operators/JavaObjectFileSink.java  |  3 +-
 .../org/apache/wayang/spark/mapping/Mappings.java  |  1 +
 .../spark/mapping/ObjectFileSinkMapping.java       | 66 ++++++++++++++++++++++
 .../spark/operators/SparkObjectFileSink.java       | 22 +++++---
 7 files changed, 166 insertions(+), 18 deletions(-)

diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
index 2406544..3a89d6c 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
@@ -53,6 +53,7 @@ public class Mappings {
             new SampleMapping(),
             new SortMapping(),
             new TextFileSinkMapping(),
+            new ObjectFileSinkMapping(),
             new TextFileSourceMapping(),
             new UnionAllMapping(),
             new ZipWithIdMapping()
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java
new file mode 100644
index 0000000..1008119
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSink;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkObjectFileSink;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSink} to {@link FlinkObjectFileSink}.
+ */
+public class ObjectFileSinkMapping implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                FlinkPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern<>(
+                "sink",
+                new ObjectFileSink<>(
+                    null,
+                    DataSetType.none().getDataUnitType().getTypeClass()
+                ),
+                false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>(
+                (matchedOperator, epoch) -> new FlinkObjectFileSink<>(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
index 9130a97..6e29ed7 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
@@ -21,6 +21,7 @@ package org.apache.wayang.flink.operators;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSink;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -44,18 +45,18 @@ import java.util.List;
  *
  * @see FlinkObjectFileSink
  */
-public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkExecutionOperator {
-
-    private final String targetPath;
+public class FlinkObjectFileSink<Type> extends ObjectFileSink<Type> implements FlinkExecutionOperator {
 
+    public FlinkObjectFileSink(ObjectFileSink<Type> that) {
+        super(that);
+    }
 
     public FlinkObjectFileSink(DataSetType<Type> type) {
         this(null, type);
     }
 
     public FlinkObjectFileSink(String targetPath, DataSetType<Type> type) {
-        super(type);
-        this.targetPath = targetPath;
+        super(targetPath, type);
     }
 
     @Override
@@ -68,10 +69,16 @@ public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkE
 
         assert inputs.length == this.getNumInputs();
         assert outputs.length <= 1;
-
-        final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
-        final String targetPath = output.addGivenOrTempPath(this.targetPath, flinkExecutor.getConfiguration());
-
+        final FileChannel.Instance output;
+        final String targetPath;
+        if(outputs.length == 1) {
+            output = (FileChannel.Instance) outputs[0];
+            targetPath = output.addGivenOrTempPath(this.textFileUrl, flinkExecutor.getConfiguration());
+        }else{
+            targetPath = this.textFileUrl;
+        }
+
+        //TODO: remove the set parallelism 1
         DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0];
         final DataSink<Type> tDataSink = input.<Type>provideDataSet()
                 .write(new WayangFileOutputFormat<Type>(targetPath), targetPath, FileSystem.WriteMode.OVERWRITE)
@@ -83,7 +90,7 @@ public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkE
 
     @Override
     protected ExecutionOperator createCopy() {
-        return new FlinkObjectFileSink<>(targetPath, this.getType());
+        return new FlinkObjectFileSink<>(this.textFileUrl, this.getType());
     }
 
     @Override
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
index 060efcf..8fbd470 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
@@ -83,8 +83,9 @@ public class JavaObjectFileSink<T> extends ObjectFileSink<T> implements JavaExec
 
         // Prepare Hadoop's SequenceFile.Writer.
         final String path;
+        FileChannel.Instance output;
         if(outputs.length == 1) {
-            FileChannel.Instance output = (FileChannel.Instance) outputs[0];
+            output = (FileChannel.Instance) outputs[0];
             path = output.addGivenOrTempPath(this.textFileUrl,
                 javaExecutor.getCompiler().getConfiguration());
         }else{
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 70ec802..5618e78 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -32,6 +32,7 @@ public class Mappings {
     public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
             new TextFileSourceMapping(),
             new TextFileSinkMapping(),
+            new ObjectFileSinkMapping(),
             new MapMapping(),
             new MapPartitionsMapping(),
             new ReduceByMapping(),
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java
new file mode 100644
index 0000000..bd6bed6
--- /dev/null
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.spark.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSink;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaObjectFileSink;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.operators.SparkObjectFileSink;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSink} to {@link SparkObjectFileSink}.
+ */
+public class ObjectFileSinkMapping implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                SparkPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern<>(
+            "sink",
+            new ObjectFileSink<>(
+                null,
+                DataSetType.none().getDataUnitType().getTypeClass()
+            ),
+            false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>(
+                (matchedOperator, epoch) -> new SparkObjectFileSink<>(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
index 7115500..1afc9b6 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
@@ -19,6 +19,7 @@
 package org.apache.wayang.spark.operators;
 
 import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSink;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -42,17 +43,18 @@ import java.util.List;
  *
  * @see SparkObjectFileSource
  */
-public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecutionOperator {
+public class SparkObjectFileSink<T> extends ObjectFileSink<T> implements SparkExecutionOperator {
 
-    private final String targetPath;
+    public SparkObjectFileSink(ObjectFileSink<T> that) {
+        super(that);
+    }
 
     public SparkObjectFileSink(DataSetType<T> type) {
         this(null, type);
     }
 
     public SparkObjectFileSink(String targetPath, DataSetType<T> type) {
-        super(type);
-        this.targetPath = targetPath;
+        super(targetPath, type);
     }
 
     @Override
@@ -64,8 +66,14 @@ public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecuti
         assert inputs.length == this.getNumInputs();
         assert outputs.length <= 1;
 
-        final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
-        final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration());
+        final String targetPath;
+        if(outputs.length > 0) {
+            final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
+            targetPath = output.addGivenOrTempPath(this.textFileUrl, sparkExecutor.getConfiguration());
+        }else{
+            targetPath = this.textFileUrl;
+        }
+
         RddChannel.Instance input = (RddChannel.Instance) inputs[0];
 
         input.provideRdd()
@@ -78,7 +86,7 @@ public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecuti
 
     @Override
     protected ExecutionOperator createCopy() {
-        return new SparkObjectFileSink<>(targetPath, this.getType());
+        return new SparkObjectFileSink<>(this.textFileUrl, this.getType());
     }
 
     @Override