You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2022/11/30 11:57:31 UTC

[incubator-wayang] branch WAYANG-agoraeo updated (d199ad5c -> f1059481)

This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a change to branch WAYANG-agoraeo
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


    from d199ad5c Merge pull request #263 from apache/dependabot/maven/wayang-commons/wayang-core/com.amazonaws-aws-java-sdk-s3-1.12.261
     new 46980ab8 Iterator process to download and continue with the pipeline
     new f1059481 Sentinel Source Implementation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iterators/IteratorSentinelDownload.java        | 63 ++++++++++++++++
 .../mappings/java/JavaSentinelSourceMapping.java   | 40 ++++++++++
 .../apache/wayang/agoraeo/mappings/mappings.java   |  4 +
 .../agoraeo/operators/basic/SentinelSource.java    | 32 ++++++++
 .../agoraeo/operators/java/JavaSentinelSource.java | 88 ++++++++++++++++++++++
 5 files changed, 227 insertions(+)
 create mode 100644 wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
 create mode 100644 wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java
 create mode 100644 wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java
 create mode 100644 wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java
 create mode 100644 wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java


[incubator-wayang] 02/02: Sentinel Source Implementation

Posted by rp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch WAYANG-agoraeo
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f1059481973e88e846432a3b236d4c5edbaceea3
Author: rodrigopardomeza <ro...@gmail.com>
AuthorDate: Wed Nov 30 12:57:19 2022 +0100

    Sentinel Source Implementation
---
 .../mappings/java/JavaSentinelSourceMapping.java   | 40 ++++++++++
 .../apache/wayang/agoraeo/mappings/mappings.java   |  4 +
 .../agoraeo/operators/basic/SentinelSource.java    | 32 ++++++++
 .../agoraeo/operators/java/JavaSentinelSource.java | 88 ++++++++++++++++++++++
 4 files changed, 164 insertions(+)

diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java
new file mode 100644
index 00000000..fb94a6b0
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/java/JavaSentinelSourceMapping.java
@@ -0,0 +1,40 @@
+package org.apache.wayang.agoraeo.mappings.java;
+
+import org.apache.wayang.agoraeo.operators.basic.SentinelSource;
+import org.apache.wayang.agoraeo.operators.java.JavaSentinelSource;
+import org.apache.wayang.core.mapping.*;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class JavaSentinelSourceMapping  implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                JavaPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern(
+                "source",
+                new SentinelSource<>(
+                        null,
+                        DataSetType.none().getDataUnitType().getTypeClass()
+                ),
+                false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new ReplacementSubplanFactory.OfSingleOperators<SentinelSource>(
+                (matchedOperator, epoch) -> new JavaSentinelSource(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java
new file mode 100644
index 00000000..701ae35a
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/mappings/mappings.java
@@ -0,0 +1,4 @@
+package org.apache.wayang.agoraeo.mappings;
+
+public class mappings {
+}
diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java
new file mode 100644
index 00000000..18987a8f
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/basic/SentinelSource.java
@@ -0,0 +1,32 @@
+package org.apache.wayang.agoraeo.operators.basic;
+
+import org.apache.wayang.core.plan.wayangplan.UnarySource;
+import org.apache.wayang.core.types.DataSetType;
+
+import java.util.Iterator;
+
+public class SentinelSource<Type> extends UnarySource<Type> {
+
+    private Iterator<Type> iterator;
+
+    /**
+     * Default construct
+     *
+     * @param iterator {@link Iterator} Instance produced by this source
+     * @param _class type of the data produced by the iterator
+     */
+    public SentinelSource(Iterator<Type> iterator, Class<Type> _class){
+        super(DataSetType.createDefault(_class));
+        this.iterator = iterator;
+    }
+
+    public SentinelSource(SentinelSource<Type> that) {
+        super(that);
+        this.iterator = that.getIterator();
+    }
+
+    public Iterator<Type> getIterator() {
+        return this.iterator;
+    }
+
+}
diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java
new file mode 100644
index 00000000..04d07ffc
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/operators/java/JavaSentinelSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.agoraeo.operators.java;
+
+import org.apache.wayang.agoraeo.operators.basic.SentinelSource;
+import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+import org.apache.wayang.java.operators.JavaExecutionOperator;
+
+import java.util.*;
+import java.util.stream.StreamSupport;
+
+//TODO add the documentation and add the Profile Estimator
+public class JavaSentinelSource<Type>
+    extends SentinelSource<Type>
+    implements JavaExecutionOperator {
+
+  public JavaSentinelSource(Iterator<Type> iterator, Class<Type> _class) {
+    super(iterator, _class);
+  }
+
+  public JavaSentinelSource(SentinelSource<Type> that) {
+    super(that);
+  }
+
+  @Override
+  public List<ChannelDescriptor> getSupportedInputChannels(int index) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "%s does not have input channels.",
+            this
+        )
+    );
+  }
+
+  @Override
+  public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
+    assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
+    return Collections.singletonList(StreamChannel.DESCRIPTOR);
+  }
+
+  @Override
+  public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
+      ChannelInstance[] inputs, ChannelInstance[] outputs,
+      JavaExecutor javaExecutor, OperatorContext operatorContext) {
+    assert inputs.length == this.getNumInputs();
+    assert outputs.length == this.getNumOutputs();
+
+    ((StreamChannel.Instance) outputs[0]).accept(
+      StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(
+              this.getIterator(),
+              Spliterator.ORDERED
+          ),
+          false
+      )
+    );
+
+    return ExecutionOperator.modelLazyExecution(
+        inputs,
+        outputs,
+        operatorContext
+    );
+
+  }
+}


[incubator-wayang] 01/02: Iterator process to download and continue with the pipeline

Posted by rp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch WAYANG-agoraeo
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 46980ab85c81a141841e25d5d3b3c16509a83000
Author: Rodrigo Pardo Meza <ro...@gmail.com>
AuthorDate: Wed Nov 30 12:45:34 2022 +0100

    Iterator process to download and continue with the pipeline
---
 .../iterators/IteratorSentinelDownload.java        | 63 ++++++++++++++++++++++
 1 file changed, 63 insertions(+)

diff --git a/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
new file mode 100644
index 00000000..33c769a8
--- /dev/null
+++ b/wayang-plugins/wayang-agoraeo/src/main/java/org/apache/wayang/agoraeo/iterators/IteratorSentinelDownload.java
@@ -0,0 +1,63 @@
+package org.apache.wayang.agoraeo.iterators;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+public abstract class IteratorSentinelDownload<Input> implements Iterator<Input> {
+
+    private String _name = "";
+    private String command = "";
+    private Process process = null;
+    private Iterator<Input> iteratorProcess = null;
+
+    public IteratorSentinelDownload(String name, String command) {
+        this.command = command;
+        this._name = name;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if(this.process == null){
+            return startProcess();
+        }
+        if( !this.process.isAlive() ){
+            return this.iteratorProcess.hasNext();
+        }
+        return true;
+    }
+
+    @Override
+    public Input next() {
+        if( ! this.iteratorProcess.hasNext()){
+            return this.getDefaultValue();
+        }
+        return this.iteratorProcess.next();
+    }
+
+    private boolean startProcess(){
+        try {
+            final String name = this._name;
+            this.process = Runtime.getRuntime().exec(this.command);
+            this.iteratorProcess = getLogic(
+                    new BufferedReader(
+                            new InputStreamReader(
+                                    process.getInputStream()
+                            )
+                    ).lines()
+            ).iterator();
+            return true;
+        } catch (IOException e) {}
+        return false;
+    }
+
+    protected abstract Stream<Input> getLogic(Stream<String> baseline);
+
+    protected abstract Input getDefaultValue();
+
+    protected String getName(){
+        return this._name;
+    }
+}