You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/11/09 23:25:32 UTC

[GitHub] nwangtw closed pull request #3032: Nwang/add spout based source streamlet

nwangtw closed pull request #3032: Nwang/add spout based source streamlet
URL: https://github.com/apache/incubator-heron/pull/3032
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/src/scala/BUILD b/examples/src/scala/BUILD
index d03d9a83a0..786ac2a9ae 100644
--- a/examples/src/scala/BUILD
+++ b/examples/src/scala/BUILD
@@ -8,6 +8,7 @@ scala_binary(
     deps = [
         "@org_apache_commons_commons_lang3//jar",
         "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
         "//heron/api/src/scala:api-scala",
         "//third_party/java:kryo"
     ],
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Builder.java b/heron/api/src/java/org/apache/heron/streamlet/Builder.java
index 8e7c6e726a..d6e4f70551 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Builder.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Builder.java
@@ -20,6 +20,7 @@
 
 package org.apache.heron.streamlet;
 
+import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.streamlet.impl.BuilderImpl;
 
 /**
@@ -45,4 +46,12 @@ static Builder newBuilder() {
    * @return
    */
   <R> Streamlet<R> newSource(Source<R> generator);
+
+  /**
+   * Creates a new Streamlet using the provided spout
+   * @param spout The spout that emits the tuples of the streamlet
+   * @param <R>
+   * @return
+   */
+  <R> Streamlet<R> newSource(IRichSpout spout);
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
index 0eb06d9f1a..61715e8528 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.Builder;
 import org.apache.heron.streamlet.SerializableSupplier;
@@ -47,7 +48,6 @@ public BuilderImpl() {
   public <R> Streamlet<R> newSource(SerializableSupplier<R> supplier) {
     StreamletUtils.require(supplier != null, "supplier must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createSupplierStreamlet(supplier);
-    retval.setNumPartitions(1);
     sources.add(retval);
     return retval;
   }
@@ -56,7 +56,14 @@ public BuilderImpl() {
   public <R> Streamlet<R> newSource(Source<R> generator) {
     StreamletUtils.require(generator != null, "source must not be null.");
     StreamletImpl<R> retval = StreamletImpl.createGeneratorStreamlet(generator);
-    retval.setNumPartitions(1);
+    sources.add(retval);
+    return retval;
+  }
+
+  @Override
+  public <R> Streamlet<R> newSource(IRichSpout spout) {
+    StreamletUtils.require(spout != null, "spout must not be null.");
+    StreamletImpl<R> retval = StreamletImpl.createSpoutStreamlet(spout);
     sources.add(retval);
     return retval;
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 066dc743fd..a05040b6a9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -26,6 +26,7 @@
 import java.util.Set;
 import java.util.logging.Logger;
 
+import org.apache.heron.api.spout.IRichSpout;
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.IStreamletOperator;
 import org.apache.heron.streamlet.JoinType;
@@ -54,6 +55,7 @@
 import org.apache.heron.streamlet.impl.streamlets.RemapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SinkStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SourceStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -115,6 +117,7 @@ public boolean allBuilt() {
     REMAP("remap"),
     SINK("sink"),
     SOURCE("generator"),
+    SPOUT("spout"),
     SUPPLIER("supplier"),
     TRANSFORM("transform"),
     UNION("union");
@@ -263,6 +266,14 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
     return new SourceStreamlet<T>(generator);
   }
 
+  /**
+   * Create a Streamlet based on a Spout object
+   * @param spout The Spout function to generate the elements
+   */
+  static <T> StreamletImpl<T> createSpoutStreamlet(IRichSpout spout) {
+    return new SpoutStreamlet<T>(spout);
+  }
+
   /**
    * Return a new Streamlet by applying mapFn to each element of this Streamlet
    * @param mapFn The Map Function that should be applied to each element
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java
new file mode 100644
index 0000000000..5bb12cf0ea
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/SpoutStreamlet.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.Set;
+
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * SpoutStreamlet is a quick way of creating a Streamlet
+ * from an user supplied Spout object. The spout is the
+ * source of all tuples for this Streamlet.
+ */
+public class SpoutStreamlet<R> extends StreamletImpl<R> {
+  private IRichSpout spout;
+
+  public SpoutStreamlet(IRichSpout spout) {
+    this.spout = spout;
+    setNumPartitions(1);
+  }
+
+  @Override
+  public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
+    setDefaultNameIfNone(StreamletNamePrefix.SPOUT, stageNames);
+    bldr.setSpout(getName(), spout, getNumPartitions());
+    return true;
+  }
+}
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
index 0e5f13ead6..b299f52a84 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/Builder.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.heron.streamlet.scala
 
+import org.apache.heron.api.spout.IRichSpout
 import org.apache.heron.streamlet.scala.impl.BuilderImpl
 
 /**
@@ -49,7 +50,14 @@ trait Builder {
     * Creates a new Streamlet using the underlying generator
     *
     * @param generator The generator that generates the tuples of the streamlet
-    * @return  a Streamlet representation of the source object
+    * @return a Streamlet representation of the source object
     */
   def newSource[R](generator: Source[R]): Streamlet[R]
+
+  /**
+   * Creates a new Streamlet using the provided spout
+   * @param spout The spout that emits the tuples of the streamlet
+   * @return a Streamlet representation of the spout object
+   */
+  def newSource[R](spout: IRichSpout): Streamlet[R]
 }
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
index 0f012e4723..60203206c5 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/BuilderImpl.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.heron.streamlet.scala.impl
 
+import org.apache.heron.api.spout.IRichSpout
 import org.apache.heron.api.topology.TopologyBuilder
 import org.apache.heron.streamlet.impl.{BuilderImpl => JavaBuilderImpl}
 
@@ -39,6 +40,11 @@ class BuilderImpl(builder: org.apache.heron.streamlet.Builder)
     StreamletImpl.fromJavaStreamlet[R](newJavaStreamlet)
   }
 
+  override def newSource[R](spout: IRichSpout): Streamlet[R] = {
+    val newJavaStreamlet = builder.newSource[R](spout)
+    StreamletImpl.fromJavaStreamlet[R](newJavaStreamlet)
+  }
+
   def build(): TopologyBuilder =
     builder.asInstanceOf[JavaBuilderImpl].build()
 
diff --git a/heron/api/tests/java/org/apache/heron/resource/TestSpout.java b/heron/api/tests/java/org/apache/heron/resource/TestSpout.java
new file mode 100644
index 0000000000..49025bd5e9
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/resource/TestSpout.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+
+/**
+ * A Spout used for unit test, it will:
+ * 1. It will emit EMIT_COUNT of tuples with MESSAGE_ID.
+ * 2. The tuples are declared by outputFieldsDeclarer in fields "word"
+ */
+
+@Ignore
+public class TestSpout extends BaseRichSpout {
+  private static final int EMIT_COUNT = 10;
+  private static final String MESSAGE_ID = "MESSAGE_ID";
+
+  private final String[] toSend = new String[]{"A", "B"};
+  private SpoutOutputCollector outputCollector;
+  private int emitted = 0;
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("word"));
+  }
+
+  @Override
+  public void open(
+      Map<String, Object> map,
+      TopologyContext topologyContext,
+      SpoutOutputCollector spoutOutputCollector) {
+    this.outputCollector = spoutOutputCollector;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void nextTuple() {
+    // It will emit A, B, A, B, A, B, A, B, A, B
+    if (emitted < EMIT_COUNT) {
+      String word = toSend[emitted % toSend.length];
+      emit(outputCollector, new Values(word), MESSAGE_ID, emitted++);
+    }
+  }
+
+  protected void emit(SpoutOutputCollector collector,
+                      List<Object> tuple, Object messageId, int emittedCount) {
+    collector.emit(tuple, messageId);
+  }
+}
+
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 8a0b22d91c..a9b15bdaa3 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -32,6 +32,7 @@
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.resource.TestBasicBolt;
 import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestSpout;
 import org.apache.heron.resource.TestWindowBolt;
 import org.apache.heron.streamlet.Config;
 import org.apache.heron.streamlet.Context;
@@ -49,6 +50,7 @@
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.SpoutStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.TransformStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.UnionStreamlet;
@@ -85,6 +87,13 @@ public void testSupplierStreamlet() throws Exception {
     assertTrue(streamlet instanceof SupplierStreamlet);
   }
 
+  @Test
+  public void testSpoutStreamlet() throws Exception {
+    TestSpout spout = new TestSpout();
+    Streamlet<Double> streamlet = StreamletImpl.createSpoutStreamlet(spout);
+    assertTrue(streamlet instanceof SpoutStreamlet);
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testMapStreamlet() throws Exception {
diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala b/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala
new file mode 100644
index 0000000000..e0b7e08525
--- /dev/null
+++ b/heron/api/tests/scala/org/apache/heron/resource/TestSpout.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource
+
+
+import java.util.{List => JList}
+import java.util.{Map => JMap}
+
+import org.junit.Ignore
+
+import org.apache.heron.api.spout.BaseRichSpout
+import org.apache.heron.api.spout.SpoutOutputCollector
+import org.apache.heron.api.topology.OutputFieldsDeclarer
+import org.apache.heron.api.topology.TopologyContext
+import org.apache.heron.api.tuple.Fields
+import org.apache.heron.api.tuple.Values
+
+/**
+ * A Spout used for unit test, it will:
+ * 1. It will emit EMIT_COUNT of tuples with MESSAGE_ID.
+ * 2. The tuples are declared by outputFieldsDeclarer in fields "word"
+ */
+
+@Ignore
+class TestSpout extends BaseRichSpout {
+  private val EMIT_COUNT = 10
+  private val MESSAGE_ID = "MESSAGE_ID"
+
+  private val toSend = Array("A", "B")
+  private var outputCollector: SpoutOutputCollector = _
+  private var emitted = 0
+
+  override def declareOutputFields(outputFieldsDeclarer: OutputFieldsDeclarer ): Unit = {
+    outputFieldsDeclarer.declare(new Fields("word"))
+  }
+
+  override def open(
+      conf: JMap[String, Object],
+      topologyContext: TopologyContext,
+      spoutOutputCollector: SpoutOutputCollector): Unit = {
+    this.outputCollector = spoutOutputCollector
+  }
+
+  override def close(): Unit = {
+  }
+
+  override def nextTuple(): Unit = {
+    // It will emit A, B, A, B, A, B, A, B, A, B
+    if (emitted < EMIT_COUNT) {
+      val word = toSend(emitted % toSend.length)
+      emit(outputCollector, new Values(word), MESSAGE_ID, emitted)
+      emitted = emitted + 1
+    }
+  }
+
+  def emit(collector: SpoutOutputCollector,
+           tuple: JList[Object],
+           messageId: Object,
+           emittedCount: Int): Unit = {
+    collector.emit(tuple, messageId)
+  }
+}
diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
index cc04caad34..3d8048b4ef 100644
--- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
+++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/BuilderImplTest.scala
@@ -18,12 +18,14 @@
  */
 package org.apache.heron.streamlet.scala.impl
 
+import java.util.{Map => JMap}
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.junit.Assert.assertEquals
 
+import org.apache.heron.resource.TestSpout
 import org.apache.heron.streamlet.Context
-
 import org.apache.heron.streamlet.scala.{Builder, Streamlet, Source}
 import org.apache.heron.streamlet.scala.common.BaseFunSuite
 
@@ -69,4 +71,16 @@ class BuilderImplTest extends BaseFunSuite {
     override def cleanup(): Unit = numbers.clear()
   }
 
+  test(
+    "BuilderImpl should support streamlet generation from a user defined spout") {
+    val spout = new TestSpout
+    val spoutStreamletObj = Builder.newBuilder
+      .newSource(spout)
+      .setName("Spout_Streamlet_1")
+      .setNumPartitions(20)
+
+    assert(spoutStreamletObj.isInstanceOf[Streamlet[_]])
+    assertEquals("Spout_Streamlet_1", spoutStreamletObj.getName)
+    assertEquals(20, spoutStreamletObj.getNumPartitions)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services