You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2022/11/30 11:57:33 UTC

[incubator-wayang] 02/02: Sentinel Source Implementation

This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch WAYANG-agoraeo
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f1059481973e88e846432a3b236d4c5edbaceea3
Author: rodrigopardomeza <ro...@gmail.com>
AuthorDate: Wed Nov 30 12:57:19 2022 +0100

    Sentinel Source Implementation
---
 .../mappings/java/JavaSentinelSourceMapping.java   | 40 ++++++++++
 .../apache/wayang/agoraeo/mappings/mappings.java   |  4 +
 .../agoraeo/operators/basic/SentinelSource.java    | 32 ++++++++
 .../agoraeo/operators/java/JavaSentinelSource.java | 88 ++++++++++++++++++++++
 4 files changed, 164 insertions(+)

diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java
new file mode 100644
index 00000000..fb94a6b0
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java
@@ -0,0 +1,40 @@
+package org.apache.wayang.agoraeo.mappings.java;
+
+import org.apache.wayang.agoraeo.operators.basic.SentinelSource;
+import org.apache.wayang.agoraeo.operators.java.JavaSentinelSource;
+import org.apache.wayang.core.mapping.*;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class JavaSentinelSourceMapping  implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                JavaPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern(
+                "source",
+                new SentinelSource<>(
+                        null,
+                        DataSetType.none().getDataUnitType().getTypeClass()
+                ),
+                false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<SentinelSource>(
+                (matchedOperator, epoch) -> new JavaSentinelSource(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java
new file mode 100644
index 00000000..701ae35a
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java
@@ -0,0 +1,4 @@
+package org.apache.wayang.agoraeo.mappings;
+
+public class mappings {
+}
diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java
new file mode 100644
index 00000000..18987a8f
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java
@@ -0,0 +1,32 @@
+package org.apache.wayang.agoraeo.operators.basic;
+
+import org.apache.wayang.core.plan.wayangplan.UnarySource;
+import org.apache.wayang.core.types.DataSetType;
+
+import java.util.Iterator;
+
+public class SentinelSource<Type> extends UnarySource<Type> {
+
+    private Iterator<Type> iterator;
+
+    /**
+     * Default construct
+     *
+     * @param iterator {@link Iterator} Instance produced by this source
+     * @param _class type of the data produced by the iterator
+     */
+    public SentinelSource(Iterator<Type> iterator, Class<Type> _class){
+        super(DataSetType.createDefault(_class));
+        this.iterator = iterator;
+    }
+
+    public SentinelSource(SentinelSource<Type> that) {
+        super(that);
+        this.iterator = that.getIterator();
+    }
+
+    public Iterator<Type> getIterator() {
+        return this.iterator;
+    }
+
+}
diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java
new file mode 100644
index 00000000..04d07ffc
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.agoraeo.operators.java;
+
+import org.apache.wayang.agoraeo.operators.basic.SentinelSource;
+import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+import org.apache.wayang.java.operators.JavaExecutionOperator;
+
+import java.util.*;
+import java.util.stream.StreamSupport;
+
+//TODO add the documentation and add the Profile Estimator
+public class JavaSentinelSource<Type>
+    extends SentinelSource<Type>
+    implements JavaExecutionOperator {
+
+  public JavaSentinelSource(Iterator<Type> iterator, Class<Type> _class) {
+    super(iterator, _class);
+  }
+
+  public JavaSentinelSource(SentinelSource<Type> that) {
+    super(that);
+  }
+
+  @Override
+  public List<ChannelDescriptor> getSupportedInputChannels(int index) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "%s does not have input channels.",
+            this
+        )
+    );
+  }
+
+  @Override
+  public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
+    assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
+    return Collections.singletonList(StreamChannel.DESCRIPTOR);
+  }
+
+  @Override
+  public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
+      ChannelInstance[] inputs, ChannelInstance[] outputs,
+      JavaExecutor javaExecutor, OperatorContext operatorContext) {
+    assert inputs.length == this.getNumInputs();
+    assert outputs.length == this.getNumOutputs();
+
+    ((StreamChannel.Instance) outputs[0]).accept(
+      StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(
+              this.getIterator(),
+              Spliterator.ORDERED
+          ),
+          false
+      )
+    );
+
+    return ExecutionOperator.modelLazyExecution(
+        inputs,
+        outputs,
+        operatorContext
+    );
+
+  }
+}