You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:42 UTC
[incubator-wayang] 04/11: [WAYANG-34] add the ObjectFileSource to
the basic and platforms
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 93f9c625c6be9153dae39b5b29542ba5473611cc
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 13:57:26 2021 +0200
[WAYANG-34] add the ObjectFileSource to the basic and platforms
Signed-off-by: bertty <be...@gmail.com>
---
.../scala/org/apache/wayang/api/PlanBuilder.scala | 11 +-
.../wayang/basic/operators/ObjectFileSource.java | 197 +++++++++++++++++++++
.../org/apache/wayang/flink/mapping/Mappings.java | 1 +
.../flink/mapping/ObjectFileSourceMapping.java | 67 +++++++
.../flink/operators/FlinkObjectFileSource.java | 16 +-
.../org/apache/wayang/java/mapping/Mappings.java | 1 +
.../java/mapping/ObjectFileSourceMapping.java | 66 +++++++
.../java/operators/JavaObjectFileSource.java | 19 +-
.../org/apache/wayang/spark/mapping/Mappings.java | 1 +
.../spark/mapping/ObjectFileSourceMapping.java | 68 +++++++
.../spark/operators/SparkObjectFileSource.java | 16 +-
11 files changed, 439 insertions(+), 24 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 3709f7f..f77ffc4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -21,7 +21,7 @@ package org.apache.wayang.api
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record
-import org.apache.wayang.basic.operators.{CollectionSource, TableSource, TextFileSource}
+import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
@@ -111,6 +111,15 @@ class PlanBuilder(wayangContext: WayangContext, private var jobName: String = nu
*/
def readTextFile(url: String): DataQuanta[String] = load(new TextFileSource(url))
+
+ /**
+ * Read a object's file and provide it as a dataset of [[Object]]s.
+ *
+ * @param url the URL of the Object's file
+ * @return [[DataQuanta]] representing the file
+ */
+ def readObjectFile[T: ClassTag](url: String): DataQuanta[T] = load(new ObjectFileSource(url, dataSetType[T]))
+
/**
* Reads a database table and provides them as a dataset of [[Record]]s.
*
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
new file mode 100644
index 0000000..7ca3410
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalLong;
+import org.apache.commons.lang3.Validate;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
+import org.apache.wayang.core.plan.wayangplan.UnarySource;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.fs.FileSystems;
+
+/**
+ * This source reads a text file and outputs the lines as data units.
+ */
+public class ObjectFileSource<T> extends UnarySource<T> {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final String inputUrl;
+
+ private final Class<T> tClass;
+
+ public ObjectFileSource(String inputUrl, DataSetType<T> type) {
+ super(type);
+ this.inputUrl = inputUrl;
+ this.tClass = type.getDataUnitType().getTypeClass();
+ }
+
+ public ObjectFileSource(String inputUrl, Class<T> tClass) {
+ super(DataSetType.createDefault(tClass));
+ this.inputUrl = inputUrl;
+ this.tClass = tClass;
+ }
+
+ /**
+ * Copies an instance (exclusive of broadcasts).
+ *
+ * @param that that should be copied
+ */
+ public ObjectFileSource(ObjectFileSource that) {
+ super(that);
+ this.inputUrl = that.getInputUrl();
+ this.tClass = that.getTypeClass();
+ }
+
+ public String getInputUrl() {
+ return this.inputUrl;
+ }
+
+ public Class<T> getTypeClass(){
+ return this.tClass;
+ }
+
+ @Override
+ public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
+ final int outputIndex,
+ final Configuration configuration) {
+ Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
+ return Optional.of(new ObjectFileSource.CardinalityEstimator());
+ }
+
+
+ /**
+ * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
+ */
+ protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
+
+ public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
+
+ public static final double CORRECTNESS_PROBABILITY = 0.95d;
+
+ /**
+ * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}.
+ */
+ public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05;
+
+ @Override
+ public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
+ //TODO validate if the implementation apply for the case
+ Validate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length);
+
+ // see Job for StopWatch measurements
+ final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
+ "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
+ );
+
+ // Query the job cache first to see if there is already an estimate.
+ String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl);
+ CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
+ if (cardinalityEstimate != null) return cardinalityEstimate;
+
+ // Otherwise calculate the cardinality.
+ // First, inspect the size of the file and its line sizes.
+ OptionalLong fileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl);
+ if (!fileSize.isPresent()) {
+ ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
+ ObjectFileSource.this.inputUrl);
+ timeMeasurement.stop();
+ return this.FALLBACK_ESTIMATE;
+
+ } else if (fileSize.getAsLong() == 0L) {
+ timeMeasurement.stop();
+ return new CardinalityEstimate(0L, 0L, 1d);
+ }
+
+ OptionalDouble bytesPerLine = this.estimateBytesPerLine();
+ if (!bytesPerLine.isPresent()) {
+ ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
+ ObjectFileSource.this.inputUrl);
+ timeMeasurement.stop();
+ return this.FALLBACK_ESTIMATE;
+ }
+
+ // Extrapolate a cardinality estimate for the complete file.
+ double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
+ double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
+ cardinalityEstimate = new CardinalityEstimate(
+ (long) (numEstimatedLines - expectedDeviation),
+ (long) (numEstimatedLines + expectedDeviation),
+ CORRECTNESS_PROBABILITY
+ );
+
+ // Cache the result, so that it will not be recalculated again.
+ optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);
+
+ timeMeasurement.stop();
+ return cardinalityEstimate;
+ }
+
+ /**
+ * Estimate the number of bytes that are in each line of a given file.
+ *
+ * @return the average number of bytes per line if it could be determined
+ */
+ private OptionalDouble estimateBytesPerLine() {
+ //TODO validate if the implementation apply for the case
+// final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);
+// if (fileSystem.isPresent()) {
+//
+// // Construct a limited reader for the first x KiB of the file.
+// final int KiB = 1024;
+// final int MiB = 1024 * KiB;
+// try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(
+// ObjectFileSource.this.inputUrl), 1 * MiB)) {
+// final BufferedReader bufferedReader = new BufferedReader(
+// new InputStreamReader(lis, ObjectFileSource.this.encoding)
+// );
+//
+// // Read as much as possible.
+// char[] cbuf = new char[1024];
+// int numReadChars, numLineFeeds = 0;
+// while ((numReadChars = bufferedReader.read(cbuf)) != -1) {
+// for (int i = 0; i < numReadChars; i++) {
+// if (cbuf[i] == '\n') {
+// numLineFeeds++;
+// }
+// }
+// }
+//
+// if (numLineFeeds == 0) {
+// ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);
+// return OptionalDouble.empty();
+// }
+// return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);
+// } catch (IOException e) {
+// ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);
+// }
+// }
+
+ return OptionalDouble.empty();
+ }
+ }
+
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
index 3a89d6c..9a31979 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
@@ -55,6 +55,7 @@ public class Mappings {
new TextFileSinkMapping(),
new ObjectFileSinkMapping(),
new TextFileSourceMapping(),
+ new ObjectFileSourceMapping(),
new UnionAllMapping(),
new ZipWithIdMapping()
);
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java
new file mode 100644
index 0000000..85d67e9
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkObjectFileSource;
+import org.apache.wayang.flink.operators.FlinkTextFileSource;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSource} to {@link FlinkObjectFileSource}.
+ */
+@SuppressWarnings("unchecked")
+public class ObjectFileSourceMapping implements Mapping {
+ @Override
+ public Collection<PlanTransformation> getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ FlinkPlatform.getInstance()
+ ));
+ }
+
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern operatorPattern = new OperatorPattern(
+ "source",
+ new ObjectFileSource<>(
+ null,
+ DataSetType.none().getDataUnitType().getTypeClass()
+ ),
+ false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>(
+ (matchedOperator, epoch) -> new FlinkObjectFileSource<>(matchedOperator).at(epoch)
+ );
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
index fb1e569..9c304a4 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -52,17 +53,18 @@ import java.util.List;
*
* @see FlinkObjectFileSource
*/
-public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements FlinkExecutionOperator {
+public class FlinkObjectFileSource<Type> extends ObjectFileSource<Type> implements FlinkExecutionOperator {
- private final String sourcePath;
+ public FlinkObjectFileSource(ObjectFileSource<Type> that) {
+ super(that);
+ }
public FlinkObjectFileSource(DataSetType<Type> type) {
this(null, type);
}
public FlinkObjectFileSource(String sourcePath, DataSetType<Type> type) {
- super(type);
- this.sourcePath = sourcePath;
+ super(sourcePath, type);
}
@Override
@@ -76,12 +78,12 @@ public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements Fl
assert outputs.length == this.getNumOutputs();
final String path;
- if (this.sourcePath == null) {
+ if (this.getInputUrl() == null) {
final FileChannel.Instance input = (FileChannel.Instance) inputs[0];
path = input.getSinglePath();
} else {
assert inputs.length == 0;
- path = this.sourcePath;
+ path = this.getInputUrl();
}
DataSetChannel.Instance output = (DataSetChannel.Instance) outputs[0];
flinkExecutor.fee.setParallelism(flinkExecutor.getNumDefaultPartitions());
@@ -109,7 +111,7 @@ public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements Fl
@Override
protected ExecutionOperator createCopy() {
- return new FlinkObjectFileSource<Type>(sourcePath, this.getType());
+ return new FlinkObjectFileSource<Type>(this.getInputUrl(), this.getType());
}
@Override
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
index b2809d6..fef4791 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
@@ -32,6 +32,7 @@ public class Mappings {
public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
new TextFileSourceMapping(),
new TextFileSinkMapping(),
+ new ObjectFileSourceMapping(),
new ObjectFileSinkMapping(),
new MapMapping(),
new MapPartitionsMapping(),
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java
new file mode 100644
index 0000000..dbc101e
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaObjectFileSource;
+import org.apache.wayang.java.operators.JavaTextFileSource;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSource} to {@link JavaObjectFileSource}.
+ */
+public class ObjectFileSourceMapping implements Mapping {
+
+ @Override
+ public Collection<PlanTransformation> getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ JavaPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern operatorPattern = new OperatorPattern(
+ "source",
+ new ObjectFileSource(
+ null,
+ DataSetType.none().getDataUnitType().getTypeClass()
+ ),
+ false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>(
+ (matchedOperator, epoch) -> new JavaObjectFileSource(matchedOperator).at(epoch)
+ );
+ }
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
index 4973ffd..4da3b00 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -60,17 +61,17 @@ import java.util.stream.StreamSupport;
*
* @see JavaObjectFileSink
*/
-public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecutionOperator {
+public class JavaObjectFileSource<T> extends ObjectFileSource<T> implements JavaExecutionOperator {
- private final String sourcePath;
+ public JavaObjectFileSource(ObjectFileSource<T> that) {
+ super(that);
+ }
public JavaObjectFileSource(DataSetType<T> type) {
- this(null, type);
+ super(null, type);
}
-
public JavaObjectFileSource(String sourcePath, DataSetType<T> type) {
- super(type);
- this.sourcePath = sourcePath;
+ super(sourcePath, type);
}
@Override
@@ -83,12 +84,12 @@ public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecu
SequenceFileIterator sequenceFileIterator;
final String path;
- if (this.sourcePath == null) {
+ if (this.getInputUrl() == null) {
final FileChannel.Instance input = (FileChannel.Instance) inputs[0];
path = input.getSinglePath();
} else {
assert inputs.length == 0;
- path = this.sourcePath;
+ path = this.getInputUrl();
}
try {
final String actualInputPath = FileSystems.findActualSingleInputPath(path);
@@ -110,7 +111,7 @@ public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecu
@Override
protected ExecutionOperator createCopy() {
- return new JavaObjectFileSource<>(this.sourcePath, this.getType());
+ return new JavaObjectFileSource<>(this.getInputUrl(), this.getType());
}
@Override
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 5618e78..046fb28 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -33,6 +33,7 @@ public class Mappings {
new TextFileSourceMapping(),
new TextFileSinkMapping(),
new ObjectFileSinkMapping(),
+ new ObjectFileSourceMapping(),
new MapMapping(),
new MapPartitionsMapping(),
new ReduceByMapping(),
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java
new file mode 100644
index 0000000..888b1b4
--- /dev/null
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.spark.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.CollectionSource;
+import org.apache.wayang.basic.operators.ObjectFileSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.spark.operators.SparkCollectionSource;
+import org.apache.wayang.spark.operators.SparkObjectFileSource;
+import org.apache.wayang.spark.operators.SparkTextFileSource;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSource} to {@link SparkObjectFileSource}.
+ */
+public class ObjectFileSourceMapping implements Mapping {
+
+ @Override
+ public Collection<PlanTransformation> getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ SparkPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern operatorPattern = new OperatorPattern(
+ "source",
+ new ObjectFileSource(
+ null,
+ DataSetType.none().getDataUnitType().getTypeClass()
+ ),
+ false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>(
+ (matchedOperator, epoch) -> new SparkObjectFileSource(matchedOperator).at(epoch)
+ );
+ }
+}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
index 55820f3..a084bcd 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
@@ -20,6 +20,7 @@ package org.apache.wayang.spark.operators;
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -45,19 +46,20 @@ import java.util.List;
*
* @see SparkObjectFileSink
*/
-public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExecutionOperator {
+public class SparkObjectFileSource<T> extends ObjectFileSource<T> implements SparkExecutionOperator {
private final Logger logger = LogManager.getLogger(this.getClass());
- private final String sourcePath;
+ public SparkObjectFileSource(ObjectFileSource that) {
+ super(that);
+ }
public SparkObjectFileSource(DataSetType<T> type) {
this(null, type);
}
public SparkObjectFileSource(String sourcePath, DataSetType<T> type) {
- super(type);
- this.sourcePath = sourcePath;
+ super(sourcePath, type);
}
@Override
@@ -67,9 +69,9 @@ public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExe
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
final String sourcePath;
- if (this.sourcePath != null) {
+ if (this.getInputUrl() != null) {
assert inputs.length == 0;
- sourcePath = this.sourcePath;
+ sourcePath = this.getInputUrl();
} else {
FileChannel.Instance input = (FileChannel.Instance) inputs[0];
sourcePath = input.getSinglePath();
@@ -86,7 +88,7 @@ public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExe
@Override
protected ExecutionOperator createCopy() {
- return new SparkObjectFileSource<>(this.sourcePath, this.getType());
+ return new SparkObjectFileSource<>(this.getInputUrl(), this.getType());
}
@Override