You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:40 UTC
[incubator-wayang] 02/11: [WAYANG-34] add the ObjectFileSink to
Platform Spark and Flink
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit a472d3a9a6ecb2f1047eeffc81c20ee225bc9f1c
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 12:47:30 2021 +0200
[WAYANG-34] add the ObjectFileSink to Platform Spark and Flink
Signed-off-by: bertty <be...@gmail.com>
---
.../org/apache/wayang/flink/mapping/Mappings.java | 1 +
.../flink/mapping/ObjectFileSinkMapping.java | 64 +++++++++++++++++++++
.../flink/operators/FlinkObjectFileSink.java | 27 +++++----
.../wayang/java/operators/JavaObjectFileSink.java | 3 +-
.../org/apache/wayang/spark/mapping/Mappings.java | 1 +
.../spark/mapping/ObjectFileSinkMapping.java | 66 ++++++++++++++++++++++
.../spark/operators/SparkObjectFileSink.java | 22 +++++---
7 files changed, 166 insertions(+), 18 deletions(-)
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
index 2406544..3a89d6c 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
@@ -53,6 +53,7 @@ public class Mappings {
new SampleMapping(),
new SortMapping(),
new TextFileSinkMapping(),
+ new ObjectFileSinkMapping(),
new TextFileSourceMapping(),
new UnionAllMapping(),
new ZipWithIdMapping()
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java
new file mode 100644
index 0000000..1008119
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSink;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkObjectFileSink;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSink} to {@link FlinkObjectFileSink}.
+ */
+public class ObjectFileSinkMapping implements Mapping {
+
+ @Override
+ public Collection<PlanTransformation> getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ FlinkPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern operatorPattern = new OperatorPattern<>(
+ "sink",
+ new ObjectFileSink<>(
+ null,
+ DataSetType.none().getDataUnitType().getTypeClass()
+ ),
+ false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>(
+ (matchedOperator, epoch) -> new FlinkObjectFileSink<>(matchedOperator).at(epoch)
+ );
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
index 9130a97..6e29ed7 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java
@@ -21,6 +21,7 @@ package org.apache.wayang.flink.operators;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSink;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -44,18 +45,18 @@ import java.util.List;
*
* @see FlinkObjectFileSink
*/
-public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkExecutionOperator {
-
- private final String targetPath;
+public class FlinkObjectFileSink<Type> extends ObjectFileSink<Type> implements FlinkExecutionOperator {
+ public FlinkObjectFileSink(ObjectFileSink<Type> that) {
+ super(that);
+ }
public FlinkObjectFileSink(DataSetType<Type> type) {
this(null, type);
}
public FlinkObjectFileSink(String targetPath, DataSetType<Type> type) {
- super(type);
- this.targetPath = targetPath;
+ super(targetPath, type);
}
@Override
@@ -68,10 +69,16 @@ public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkE
assert inputs.length == this.getNumInputs();
assert outputs.length <= 1;
-
- final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
- final String targetPath = output.addGivenOrTempPath(this.targetPath, flinkExecutor.getConfiguration());
-
+ final FileChannel.Instance output;
+ final String targetPath;
+ if(outputs.length == 1) {
+ output = (FileChannel.Instance) outputs[0];
+ targetPath = output.addGivenOrTempPath(this.textFileUrl, flinkExecutor.getConfiguration());
+ }else{
+ targetPath = this.textFileUrl;
+ }
+
+ //TODO: remove the set parallelism 1
DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0];
final DataSink<Type> tDataSink = input.<Type>provideDataSet()
.write(new WayangFileOutputFormat<Type>(targetPath), targetPath, FileSystem.WriteMode.OVERWRITE)
@@ -83,7 +90,7 @@ public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkE
@Override
protected ExecutionOperator createCopy() {
- return new FlinkObjectFileSink<>(targetPath, this.getType());
+ return new FlinkObjectFileSink<>(this.textFileUrl, this.getType());
}
@Override
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
index 060efcf..8fbd470 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
@@ -83,8 +83,9 @@ public class JavaObjectFileSink<T> extends ObjectFileSink<T> implements JavaExec
// Prepare Hadoop's SequenceFile.Writer.
final String path;
+ FileChannel.Instance output;
if(outputs.length == 1) {
- FileChannel.Instance output = (FileChannel.Instance) outputs[0];
+ output = (FileChannel.Instance) outputs[0];
path = output.addGivenOrTempPath(this.textFileUrl,
javaExecutor.getCompiler().getConfiguration());
}else{
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 70ec802..5618e78 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -32,6 +32,7 @@ public class Mappings {
public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
new TextFileSourceMapping(),
new TextFileSinkMapping(),
+ new ObjectFileSinkMapping(),
new MapMapping(),
new MapPartitionsMapping(),
new ReduceByMapping(),
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java
new file mode 100644
index 0000000..bd6bed6
--- /dev/null
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.spark.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSink;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaObjectFileSink;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.spark.operators.SparkObjectFileSink;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSink} to {@link SparkObjectFileSink}.
+ */
+public class ObjectFileSinkMapping implements Mapping {
+
+ @Override
+ public Collection<PlanTransformation> getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ SparkPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern operatorPattern = new OperatorPattern<>(
+ "sink",
+ new ObjectFileSink<>(
+ null,
+ DataSetType.none().getDataUnitType().getTypeClass()
+ ),
+ false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>(
+ (matchedOperator, epoch) -> new SparkObjectFileSink<>(matchedOperator).at(epoch)
+ );
+ }
+}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
index 7115500..1afc9b6 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java
@@ -19,6 +19,7 @@
package org.apache.wayang.spark.operators;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSink;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -42,17 +43,18 @@ import java.util.List;
*
* @see SparkObjectFileSource
*/
-public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecutionOperator {
+public class SparkObjectFileSink<T> extends ObjectFileSink<T> implements SparkExecutionOperator {
- private final String targetPath;
+ public SparkObjectFileSink(ObjectFileSink<T> that) {
+ super(that);
+ }
public SparkObjectFileSink(DataSetType<T> type) {
this(null, type);
}
public SparkObjectFileSink(String targetPath, DataSetType<T> type) {
- super(type);
- this.targetPath = targetPath;
+ super(targetPath, type);
}
@Override
@@ -64,8 +66,14 @@ public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecuti
assert inputs.length == this.getNumInputs();
assert outputs.length <= 1;
- final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
- final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration());
+ final String targetPath;
+ if(outputs.length > 0) {
+ final FileChannel.Instance output = (FileChannel.Instance) outputs[0];
+ targetPath = output.addGivenOrTempPath(this.textFileUrl, sparkExecutor.getConfiguration());
+ }else{
+ targetPath = this.textFileUrl;
+ }
+
RddChannel.Instance input = (RddChannel.Instance) inputs[0];
input.provideRdd()
@@ -78,7 +86,7 @@ public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecuti
@Override
protected ExecutionOperator createCopy() {
- return new SparkObjectFileSink<>(targetPath, this.getType());
+ return new SparkObjectFileSink<>(this.textFileUrl, this.getType());
}
@Override