You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@heron.apache.org by GitBox <gi...@apache.org> on 2018/04/06 05:04:45 UTC

[GitHub] kramasamy closed pull request #2851: Joshfischer/eco stateful topology

kramasamy closed pull request #2851: Joshfischer/eco stateful topology
URL: https://github.com/apache/incubator-heron/pull/2851
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java
new file mode 100644
index 0000000000..6ece43cd31
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java
@@ -0,0 +1,51 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Random;
+
+public class RandomString {
+  private  char[] symbols;
+
+  private Random random = new Random();
+
+  private char[] buf;
+
+  public RandomString(int length) {
+    // Construct the symbol set
+    StringBuilder tmp = new StringBuilder();
+    for (char ch = '0'; ch <= '9'; ++ch) {
+      tmp.append(ch);
+    }
+
+    for (char ch = 'a'; ch <= 'z'; ++ch) {
+      tmp.append(ch);
+    }
+
+    symbols = tmp.toString().toCharArray();
+    if (length < 1) {
+      throw new IllegalArgumentException("length < 1: " + length);
+    }
+
+    buf = new char[length];
+  }
+
+  public String nextString() {
+    for (int idx = 0; idx < buf.length; ++idx) {
+      buf[idx] = symbols[random.nextInt(symbols.length)];
+    }
+
+    return new String(buf);
+  }
+}
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java
new file mode 100644
index 0000000000..bd5cbfee1e
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java
@@ -0,0 +1,66 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+
+import com.twitter.heron.api.bolt.BaseRichBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Tuple;
+
+
+public class StatefulConsumerBolt extends BaseRichBolt
+    implements IStatefulComponent<Integer, Integer> {
+  private static final long serialVersionUID = -5470591933906954522L;
+
+  private OutputCollector collector;
+  private State<Integer, Integer> myState;
+
+  @Override
+  public void initState(State<Integer, Integer> state) {
+    this.myState = state;
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    // Nothing really since we operate out of the system supplied state
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+    collector = outputCollector;
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    int key = tuple.getInteger(0);
+    System.out.println("looking in state for: " + key);
+    if (myState.get(key) == null) {
+      System.out.println("did not find " + key + " in state: ");
+      myState.put(key, 1);
+    } else {
+      System.out.println("found in state: " + key);
+      Integer val = myState.get(key);
+      myState.put(key, ++val);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+  }
+}
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java
new file mode 100644
index 0000000000..c3fb3db84f
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java
@@ -0,0 +1,83 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.utils.Utils;
+
+@SuppressWarnings("HiddenField")
+public class StatefulNumberSpout extends BaseRichSpout
+    implements IStatefulComponent<String, Long> {
+  private static final Logger LOG = Logger.getLogger(StatefulNumberSpout.class.getName());
+  private static final long serialVersionUID = 5454291010750852782L;
+  private SpoutOutputCollector collector;
+  private Random rand;
+  private long msgId;
+  private State<String, Long> state;
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("value", "ts", "msgid"));
+  }
+
+  @Override
+  public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector
+      collector) {
+    this.collector = collector;
+    this.rand = new Random();
+  }
+
+  @Override
+  public void nextTuple() {
+    Utils.sleep(1000);
+    long val = msgId;
+    long randomNumber = System.currentTimeMillis() - (24 * 60 * 60 * 1000);
+    System.out.println("Emitting: " + val);
+    collector.emit(new Values(val,
+        randomNumber, msgId), msgId);
+    msgId++;
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    LOG.fine("Got ACK for msgId : " + msgId);
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    LOG.fine("Got FAIL for msgId : " + msgId);
+  }
+
+  @Override
+  public void initState(State<String, Long> state) {
+    this.state = state;
+    this.msgId = this.state.getOrDefault("msgId", 0L);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    this.state.put("msgId", msgId);
+  }
+}
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java
new file mode 100644
index 0000000000..e128840b7f
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java
@@ -0,0 +1,73 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+
+import backtype.storm.utils.Utils;
+
+public class StatefulRandomIntSpout extends BaseRichSpout
+    implements IStatefulComponent<String, Integer> {
+  private SpoutOutputCollector spoutOutputCollector;
+  private State<String, Integer> count;
+
+  public StatefulRandomIntSpout() {
+  }
+
+  // Generates a random integer between 1 and 100
+  private int randomInt() {
+    return ThreadLocalRandom.current().nextInt(1, 101);
+  }
+
+  // These two methods are required to implement the IStatefulComponent interface
+  @Override
+  public void preSave(String checkpointId) {
+    System.out.println(String.format("Saving spout state at checkpoint %s", checkpointId));
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    count = state;
+  }
+
+  // These three methods are required to extend the BaseRichSpout abstract class
+  @Override
+  public void open(Map<String, Object> map, TopologyContext ctx, SpoutOutputCollector collector) {
+    spoutOutputCollector = collector;
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("random-int"));
+  }
+
+  @Override
+  public void nextTuple() {
+    Utils.sleep(2000);
+    int randomInt = randomInt();
+    System.out.println("Emitting Value: " + randomInt);
+    spoutOutputCollector.emit(new Values(randomInt));
+  }
+}
+
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java
new file mode 100644
index 0000000000..8f721e467c
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java
@@ -0,0 +1,67 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+
+import com.twitter.heron.api.bolt.BaseStatefulWindowedBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.windowing.TupleWindow;
+
+@SuppressWarnings("HiddenField")
+public class StatefulWindowSumBolt extends BaseStatefulWindowedBolt<String, Long> {
+  private static final long serialVersionUID = -539382497249834244L;
+  private State<String, Long> state;
+  private long sum;
+
+  private OutputCollector collector;
+
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context,
+                      OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  @Override
+  public void initState(State<String, Long> state) {
+    this.state = state;
+    sum = state.getOrDefault("sum", 0L);
+  }
+
+  @Override
+  public void execute(TupleWindow inputWindow) {
+    for (Tuple tuple : inputWindow.get()) {
+      System.out.println("Adding to sum: " + tuple.getLongByField("value"));
+      sum += tuple.getLongByField("value");
+      System.out.println("Sum is now: " + sum);
+    }
+    collector.emit(new Values(sum));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("sum"));
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    state.put("sum", sum);
+  }
+}
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java
new file mode 100644
index 0000000000..3f3b6f1ea3
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java
@@ -0,0 +1,64 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+import java.util.Random;
+
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+
+@SuppressWarnings("HiddenField")
+public class WordSpout extends BaseRichSpout {
+  private static final long serialVersionUID = 4322775001819135036L;
+
+  private static final int ARRAY_LENGTH = 128 * 1024;
+  private static final int WORD_LENGTH = 20;
+
+  private final String[] words = new String[ARRAY_LENGTH];
+
+  private final Random rnd = new Random(31);
+
+  private SpoutOutputCollector collector;
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("word"));
+  }
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public void open(Map map, TopologyContext topologyContext,
+                   SpoutOutputCollector spoutOutputCollector) {
+    System.out.println("open spout");
+    RandomString randomString = new RandomString(WORD_LENGTH);
+
+    for (int i = 0; i < ARRAY_LENGTH; i++) {
+      words[i] = randomString.nextString();
+    }
+
+    collector = spoutOutputCollector;
+  }
+
+  @Override
+  public void nextTuple() {
+    System.out.println("next tuple");
+    int nextInt = rnd.nextInt(ARRAY_LENGTH);
+    collector.emit(new Values(words[nextInt]));
+  }
+}
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml
new file mode 100644
index 0000000000..9c9d5cf3da
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml
@@ -0,0 +1,52 @@
+#  Copyright 2017 Twitter. All rights reserved.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http:#www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+name: "stateful-windowing"
+type: "heron"
+
+config:
+  topology.workers: 1
+  topology.reliability.mode: "EFFECTIVELY_ONCE"
+
+components:
+
+  - id: "windowLength"
+    className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count"
+    constructorArgs:
+      - 5
+
+  - id: "slidingInterval"
+    className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count"
+    constructorArgs:
+      - 3
+
+spouts:
+  - id: "integer-spout"
+    className: "com.twitter.heron.examples.eco.StatefulNumberSpout"
+    parallelism: 1
+
+bolts:
+  - id: "stateful-window-sum-bolt"
+    className: "com.twitter.heron.examples.eco.StatefulWindowSumBolt"
+    configMethods:
+      - name: "withWindow"
+        args: [ref: "windowLength", ref: "slidingInterval"]
+    parallelism: 1
+
+
+streams:
+  - from: "integer-spout"
+    to: "stateful-window-sum-bolt"
+    grouping:
+      type: SHUFFLE
\ No newline at end of file
diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml
new file mode 100644
index 0000000000..e5838afea8
--- /dev/null
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml
@@ -0,0 +1,38 @@
+#  Copyright 2017 Twitter. All rights reserved.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http:#www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+name: "stateful-word-count"
+type: "heron"
+
+config:
+  topology.workers: 1
+  topology.reliability.mode: "EFFECTIVELY_ONCE"
+
+
+spouts:
+  - id: "int-spout"
+    className: "com.twitter.heron.examples.eco.StatefulRandomIntSpout"
+    parallelism: 1
+
+bolts:
+  - id: "stateful-consumer-bolt"
+    className: "com.twitter.heron.examples.eco.StatefulConsumerBolt"
+    parallelism: 1
+
+
+streams:
+  - from: "int-spout"
+    to: "stateful-consumer-bolt"
+    grouping:
+      type: SHUFFLE
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services