You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/27 15:08:42 UTC

[incubator-wayang] 04/11: [WAYANG-34] add the ObjectFileSource to the basic and platforms

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 93f9c625c6be9153dae39b5b29542ba5473611cc
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Mon Sep 27 13:57:26 2021 +0200

    [WAYANG-34] add the ObjectFileSource to the basic and platforms
    
    Signed-off-by: bertty <be...@gmail.com>
---
 .../scala/org/apache/wayang/api/PlanBuilder.scala  |  11 +-
 .../wayang/basic/operators/ObjectFileSource.java   | 197 +++++++++++++++++++++
 .../org/apache/wayang/flink/mapping/Mappings.java  |   1 +
 .../flink/mapping/ObjectFileSourceMapping.java     |  67 +++++++
 .../flink/operators/FlinkObjectFileSource.java     |  16 +-
 .../org/apache/wayang/java/mapping/Mappings.java   |   1 +
 .../java/mapping/ObjectFileSourceMapping.java      |  66 +++++++
 .../java/operators/JavaObjectFileSource.java       |  19 +-
 .../org/apache/wayang/spark/mapping/Mappings.java  |   1 +
 .../spark/mapping/ObjectFileSourceMapping.java     |  68 +++++++
 .../spark/operators/SparkObjectFileSource.java     |  16 +-
 11 files changed, 439 insertions(+), 24 deletions(-)

diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 3709f7f..f77ffc4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -21,7 +21,7 @@ package org.apache.wayang.api
 import org.apache.commons.lang3.Validate
 import org.apache.wayang.api
 import org.apache.wayang.basic.data.Record
-import org.apache.wayang.basic.operators.{CollectionSource, TableSource, TextFileSource}
+import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource}
 import org.apache.wayang.commons.util.profiledb.model.Experiment
 import org.apache.wayang.core.api.WayangContext
 import org.apache.wayang.core.plan.wayangplan._
@@ -111,6 +111,15 @@ class PlanBuilder(wayangContext: WayangContext, private var jobName: String = nu
     */
   def readTextFile(url: String): DataQuanta[String] = load(new TextFileSource(url))
 
+
+  /**
+   * Read a object's file and provide it as a dataset of [[Object]]s.
+   *
+   * @param url the URL of the Object's file
+   * @return [[DataQuanta]] representing the file
+   */
+  def readObjectFile[T: ClassTag](url: String): DataQuanta[T] = load(new ObjectFileSource(url, dataSetType[T]))
+
   /**
     * Reads a database table and provides them as a dataset of [[Record]]s.
     *
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
new file mode 100644
index 0000000..7ca3410
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalLong;
+import org.apache.commons.lang3.Validate;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
+import org.apache.wayang.core.plan.wayangplan.UnarySource;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.fs.FileSystems;
+
+/**
+ * This source reads a text file and outputs the lines as data units.
+ */
+public class ObjectFileSource<T> extends UnarySource<T> {
+
+    private final Logger logger = LogManager.getLogger(this.getClass());
+
+    private final String inputUrl;
+
+    private final Class<T> tClass;
+
+    public ObjectFileSource(String inputUrl, DataSetType<T> type) {
+        super(type);
+        this.inputUrl = inputUrl;
+        this.tClass = type.getDataUnitType().getTypeClass();
+    }
+
+    public ObjectFileSource(String inputUrl, Class<T> tClass) {
+        super(DataSetType.createDefault(tClass));
+        this.inputUrl = inputUrl;
+        this.tClass = tClass;
+    }
+
+    /**
+     * Copies an instance (exclusive of broadcasts).
+     *
+     * @param that that should be copied
+     */
+    public ObjectFileSource(ObjectFileSource that) {
+        super(that);
+        this.inputUrl = that.getInputUrl();
+        this.tClass = that.getTypeClass();
+    }
+
+    public String getInputUrl() {
+        return this.inputUrl;
+    }
+
+    public Class<T> getTypeClass(){
+        return this.tClass;
+    }
+
+    @Override
+    public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
+            final int outputIndex,
+            final Configuration configuration) {
+        Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
+        return Optional.of(new ObjectFileSource.CardinalityEstimator());
+    }
+
+
+    /**
+     * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
+     */
+    protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
+
+        public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
+
+        public static final double CORRECTNESS_PROBABILITY = 0.95d;
+
+        /**
+         * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}.
+         */
+        public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05;
+
+        @Override
+        public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
+            //TODO validate if the implementation apply for the case
+            Validate.isTrue(ObjectFileSource.this.getNumInputs() == inputEstimates.length);
+
+            // see Job for StopWatch measurements
+            final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
+                    "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
+            );
+
+            // Query the job cache first to see if there is already an estimate.
+            String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ObjectFileSource.this.inputUrl);
+            CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
+            if (cardinalityEstimate != null) return  cardinalityEstimate;
+
+            // Otherwise calculate the cardinality.
+            // First, inspect the size of the file and its line sizes.
+            OptionalLong fileSize = FileSystems.getFileSize(ObjectFileSource.this.inputUrl);
+            if (!fileSize.isPresent()) {
+                ObjectFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
+                        ObjectFileSource.this.inputUrl);
+                timeMeasurement.stop();
+                return this.FALLBACK_ESTIMATE;
+
+            } else if (fileSize.getAsLong() == 0L) {
+                timeMeasurement.stop();
+                return new CardinalityEstimate(0L, 0L, 1d);
+            }
+
+            OptionalDouble bytesPerLine = this.estimateBytesPerLine();
+            if (!bytesPerLine.isPresent()) {
+                ObjectFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
+                        ObjectFileSource.this.inputUrl);
+                timeMeasurement.stop();
+                return this.FALLBACK_ESTIMATE;
+            }
+
+            // Extrapolate a cardinality estimate for the complete file.
+            double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
+            double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
+            cardinalityEstimate = new CardinalityEstimate(
+                    (long) (numEstimatedLines - expectedDeviation),
+                    (long) (numEstimatedLines + expectedDeviation),
+                    CORRECTNESS_PROBABILITY
+            );
+
+            // Cache the result, so that it will not be recalculated again.
+            optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);
+
+            timeMeasurement.stop();
+            return cardinalityEstimate;
+        }
+
+        /**
+         * Estimate the number of bytes that are in each line of a given file.
+         *
+         * @return the average number of bytes per line if it could be determined
+         */
+        private OptionalDouble estimateBytesPerLine() {
+            //TODO validate if the implementation apply for the case
+//            final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(ObjectFileSource.this.inputUrl);
+//            if (fileSystem.isPresent()) {
+//
+//                // Construct a limited reader for the first x KiB of the file.
+//                final int KiB = 1024;
+//                final int MiB = 1024 * KiB;
+//                try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(
+//                    ObjectFileSource.this.inputUrl), 1 * MiB)) {
+//                    final BufferedReader bufferedReader = new BufferedReader(
+//                            new InputStreamReader(lis, ObjectFileSource.this.encoding)
+//                    );
+//
+//                    // Read as much as possible.
+//                    char[] cbuf = new char[1024];
+//                    int numReadChars, numLineFeeds = 0;
+//                    while ((numReadChars = bufferedReader.read(cbuf)) != -1) {
+//                        for (int i = 0; i < numReadChars; i++) {
+//                            if (cbuf[i] == '\n') {
+//                                numLineFeeds++;
+//                            }
+//                        }
+//                    }
+//
+//                    if (numLineFeeds == 0) {
+//                        ObjectFileSource.this.logger.warn("Could not find any newline character in {}.", ObjectFileSource.this.inputUrl);
+//                        return OptionalDouble.empty();
+//                    }
+//                    return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);
+//                } catch (IOException e) {
+//                    ObjectFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);
+//                }
+//            }
+
+            return OptionalDouble.empty();
+        }
+    }
+
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
index 3a89d6c..9a31979 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
@@ -55,6 +55,7 @@ public class Mappings {
             new TextFileSinkMapping(),
             new ObjectFileSinkMapping(),
             new TextFileSourceMapping(),
+            new ObjectFileSourceMapping(),
             new UnionAllMapping(),
             new ZipWithIdMapping()
     );
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java
new file mode 100644
index 0000000..85d67e9
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/ObjectFileSourceMapping.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkObjectFileSource;
+import org.apache.wayang.flink.operators.FlinkTextFileSource;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSource} to {@link FlinkObjectFileSource}.
+ */
+@SuppressWarnings("unchecked")
+public class ObjectFileSourceMapping implements Mapping {
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                FlinkPlatform.getInstance()
+        ));
+    }
+
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern(
+            "source",
+            new ObjectFileSource<>(
+                null,
+                DataSetType.none().getDataUnitType().getTypeClass()
+            ),
+            false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>(
+                (matchedOperator, epoch) -> new FlinkObjectFileSource<>(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
index fb1e569..9c304a4 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.wayang.basic.channels.FileChannel;
 import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.operators.ObjectFileSource;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -52,17 +53,18 @@ import java.util.List;
  *
  * @see FlinkObjectFileSource
  */
-public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements FlinkExecutionOperator {
+public class FlinkObjectFileSource<Type> extends ObjectFileSource<Type> implements FlinkExecutionOperator {
 
-    private final String sourcePath;
+    public FlinkObjectFileSource(ObjectFileSource<Type> that) {
+        super(that);
+    }
 
     public FlinkObjectFileSource(DataSetType<Type> type) {
         this(null, type);
     }
 
     public FlinkObjectFileSource(String sourcePath, DataSetType<Type> type) {
-        super(type);
-        this.sourcePath = sourcePath;
+        super(sourcePath, type);
     }
 
     @Override
@@ -76,12 +78,12 @@ public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements Fl
         assert outputs.length == this.getNumOutputs();
 
         final String path;
-        if (this.sourcePath == null) {
+        if (this.getInputUrl() == null) {
             final FileChannel.Instance input = (FileChannel.Instance) inputs[0];
             path = input.getSinglePath();
         } else {
             assert inputs.length == 0;
-            path = this.sourcePath;
+            path = this.getInputUrl();
         }
         DataSetChannel.Instance output = (DataSetChannel.Instance) outputs[0];
         flinkExecutor.fee.setParallelism(flinkExecutor.getNumDefaultPartitions());
@@ -109,7 +111,7 @@ public class FlinkObjectFileSource<Type> extends UnarySource<Type> implements Fl
 
     @Override
     protected ExecutionOperator createCopy() {
-        return new FlinkObjectFileSource<Type>(sourcePath, this.getType());
+        return new FlinkObjectFileSource<Type>(this.getInputUrl(), this.getType());
     }
 
     @Override
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
index b2809d6..fef4791 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
@@ -32,6 +32,7 @@ public class Mappings {
     public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
             new TextFileSourceMapping(),
             new TextFileSinkMapping(),
+            new ObjectFileSourceMapping(),
             new ObjectFileSinkMapping(),
             new MapMapping(),
             new MapPartitionsMapping(),
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java
new file mode 100644
index 0000000..dbc101e
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSourceMapping.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaObjectFileSource;
+import org.apache.wayang.java.operators.JavaTextFileSource;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSource} to {@link JavaObjectFileSource}.
+ */
+public class ObjectFileSourceMapping implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                JavaPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern(
+            "source",
+            new ObjectFileSource(
+                null,
+                DataSetType.none().getDataUnitType().getTypeClass()
+            ),
+            false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>(
+                (matchedOperator, epoch) -> new JavaObjectFileSource(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
index 4973ffd..4da3b00 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSource;
 import org.apache.wayang.core.api.exception.WayangException;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -60,17 +61,17 @@ import java.util.stream.StreamSupport;
  *
  * @see JavaObjectFileSink
  */
-public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecutionOperator {
+public class JavaObjectFileSource<T> extends ObjectFileSource<T> implements JavaExecutionOperator {
 
-    private final String sourcePath;
+    public JavaObjectFileSource(ObjectFileSource<T> that) {
+        super(that);
+    }
 
     public JavaObjectFileSource(DataSetType<T> type) {
-        this(null, type);
+        super(null, type);
     }
-
     public JavaObjectFileSource(String sourcePath, DataSetType<T> type) {
-        super(type);
-        this.sourcePath = sourcePath;
+        super(sourcePath, type);
     }
 
     @Override
@@ -83,12 +84,12 @@ public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecu
 
         SequenceFileIterator sequenceFileIterator;
         final String path;
-        if (this.sourcePath == null) {
+        if (this.getInputUrl() == null) {
             final FileChannel.Instance input = (FileChannel.Instance) inputs[0];
             path = input.getSinglePath();
         } else {
             assert inputs.length == 0;
-            path = this.sourcePath;
+            path = this.getInputUrl();
         }
         try {
             final String actualInputPath = FileSystems.findActualSingleInputPath(path);
@@ -110,7 +111,7 @@ public class JavaObjectFileSource<T> extends UnarySource<T> implements JavaExecu
 
     @Override
     protected ExecutionOperator createCopy() {
-        return new JavaObjectFileSource<>(this.sourcePath, this.getType());
+        return new JavaObjectFileSource<>(this.getInputUrl(), this.getType());
     }
 
     @Override
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 5618e78..046fb28 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -33,6 +33,7 @@ public class Mappings {
             new TextFileSourceMapping(),
             new TextFileSinkMapping(),
             new ObjectFileSinkMapping(),
+            new ObjectFileSourceMapping(),
             new MapMapping(),
             new MapPartitionsMapping(),
             new ReduceByMapping(),
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java
new file mode 100644
index 0000000..888b1b4
--- /dev/null
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ObjectFileSourceMapping.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.spark.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.CollectionSource;
+import org.apache.wayang.basic.operators.ObjectFileSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.spark.operators.SparkCollectionSource;
+import org.apache.wayang.spark.operators.SparkObjectFileSource;
+import org.apache.wayang.spark.operators.SparkTextFileSource;
+import org.apache.wayang.spark.platform.SparkPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSource} to {@link SparkObjectFileSource}.
+ */
+public class ObjectFileSourceMapping implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                SparkPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern(
+            "source",
+            new ObjectFileSource(
+                null,
+                DataSetType.none().getDataUnitType().getTypeClass()
+            ),
+            false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSource>(
+                (matchedOperator, epoch) -> new SparkObjectFileSource(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
index 55820f3..a084bcd 100644
--- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
+++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java
@@ -20,6 +20,7 @@ package org.apache.wayang.spark.operators;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSource;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
 import org.apache.wayang.core.plan.wayangplan.Operator;
@@ -45,19 +46,20 @@ import java.util.List;
  *
  * @see SparkObjectFileSink
  */
-public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExecutionOperator {
+public class SparkObjectFileSource<T> extends ObjectFileSource<T> implements SparkExecutionOperator {
 
     private final Logger logger = LogManager.getLogger(this.getClass());
 
-    private final String sourcePath;
+    public SparkObjectFileSource(ObjectFileSource that) {
+        super(that);
+    }
 
     public SparkObjectFileSource(DataSetType<T> type) {
         this(null, type);
     }
 
     public SparkObjectFileSource(String sourcePath, DataSetType<T> type) {
-        super(type);
-        this.sourcePath = sourcePath;
+        super(sourcePath, type);
     }
 
     @Override
@@ -67,9 +69,9 @@ public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExe
             SparkExecutor sparkExecutor,
             OptimizationContext.OperatorContext operatorContext) {
         final String sourcePath;
-        if (this.sourcePath != null) {
+        if (this.getInputUrl() != null) {
             assert inputs.length == 0;
-            sourcePath = this.sourcePath;
+            sourcePath = this.getInputUrl();
         } else {
             FileChannel.Instance input = (FileChannel.Instance) inputs[0];
             sourcePath = input.getSinglePath();
@@ -86,7 +88,7 @@ public class SparkObjectFileSource<T> extends UnarySource<T> implements SparkExe
 
     @Override
     protected ExecutionOperator createCopy() {
-        return new SparkObjectFileSource<>(this.sourcePath, this.getType());
+        return new SparkObjectFileSource<>(this.getInputUrl(), this.getType());
     }
 
     @Override