You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ka...@apache.org on 2018/04/05 14:35:51 UTC
[incubator-heron] branch master updated: Eco for heron topologies
(#2849)
This is an automated email from the ASF dual-hosted git repository.
karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 98aa2a2 Eco for heron topologies (#2849)
98aa2a2 is described below
commit 98aa2a22ab655a97e4a0f4a9cc87ad49981f2947
Author: Karthik Ramasamy <ka...@streaml.io>
AuthorDate: Thu Apr 5 07:35:48 2018 -0700
Eco for heron topologies (#2849)
* added cereal serialization/deserialization library
* initial restructuring for heron eco
* support for heron eco
* add the pivot
* fixing storm submitter checkstyles
* fixing eco checkstyles
* checkstyles
* added heron eco tests
* fixing check styles
* add submitter test
* fixing ecoSubmitter test
* fixing eco submitter test for storm topology
* clean up
* adding test to verify heron topology submitter
* adding test to verify heron topology submitter
* fixing checkstyles?
* fixed the check style
* added the check for topology type to be rigorous
* added heron eco examples
* modify travis and intellij scripts
* fix checkstyle issues
---
.../src/java/BUILD | 2 +-
.../twitter/heron/examples/eco/EvenAndOddBolt.java | 14 +--
.../twitter/heron/examples/eco/LogInfoBolt.java | 8 +-
.../heron/examples/eco/TestFibonacciSpout.java | 15 +--
.../heron/examples/eco/TestIBasicPrintBolt.java | 14 +--
.../heron/examples/eco/TestNameCounter.java | 12 +-
.../twitter/heron/examples/eco/TestNameSpout.java | 18 +--
.../twitter/heron/examples/eco/TestPrintBolt.java | 8 +-
.../heron/examples/eco/TestPropertyHolder.java | 0
.../com/twitter/heron/examples/eco/TestUnits.java | 0
.../twitter/heron/examples/eco/TestWindowBolt.java | 14 +--
.../heron/examples/eco/heron_fibonacci.yaml | 5 +-
.../heron/examples/eco/heron_windowing.yaml | 9 +-
.../heron/examples/eco/heron_wordcount.yaml | 5 +-
.../twitter/heron/examples/eco/sample.properties | 0
.../src/java/BUILD | 10 +-
.../twitter/heron/examples/eco/EvenAndOddBolt.java | 0
.../twitter/heron/examples/eco/LogInfoBolt.java | 0
.../heron/examples/eco/TestFibonacciSpout.java | 0
.../heron/examples/eco/TestIBasicPrintBolt.java | 0
.../heron/examples/eco/TestNameCounter.java | 0
.../twitter/heron/examples/eco/TestNameSpout.java | 0
.../twitter/heron/examples/eco/TestPrintBolt.java | 0
.../heron/examples/eco/TestPropertyHolder.java | 0
.../com/twitter/heron/examples/eco/TestUnits.java | 0
.../twitter/heron/examples/eco/TestWindowBolt.java | 0
.../com/twitter/heron/examples/eco/fibonacci.yaml | 0
.../twitter/heron/examples/eco/sample.properties | 0
.../heron/examples/eco/simple_windowing.yaml | 0
.../heron/examples/eco/simple_wordcount.yaml | 0
eco/src/java/BUILD | 53 ++++++++-
eco/src/java/com/twitter/heron/eco/Eco.java | 87 +++++++++-----
.../com/twitter/heron/eco/builder/BoltBuilder.java | 2 +-
.../heron/eco/builder/ComponentBuilder.java | 2 +-
.../twitter/heron/eco/builder/ConfigBuilder.java | 7 +-
.../heron/eco/builder/{ => heron}/EcoBuilder.java | 11 +-
.../eco/builder/{ => heron}/SpoutBuilder.java | 8 +-
.../eco/builder/{ => heron}/StreamBuilder.java | 20 ++--
.../heron/eco/builder/{ => storm}/EcoBuilder.java | 7 +-
.../eco/builder/{ => storm}/SpoutBuilder.java | 4 +-
.../eco/builder/{ => storm}/StreamBuilder.java | 4 +-
.../eco/definition/EcoTopologyDefinition.java | 18 ++-
.../com/twitter/heron/eco/submit/EcoSubmitter.java | 18 ++-
eco/tests/java/BUILD | 39 ++++++-
eco/tests/java/com/twitter/heron/eco/EcoTest.java | 48 ++++----
.../HeronEcoBuilderTest.java} | 12 +-
.../HeronSpoutBuilderTest.java} | 16 +--
.../HeronStreamBuilderTest.java} | 46 ++++----
.../StormEcoBuilderTest.java} | 9 +-
.../StormSpoutBuilderTest.java} | 6 +-
.../StormStreamBuilderTest.java} | 6 +-
.../twitter/heron/eco/parser/EcoParserTest.java | 2 +
.../twitter/heron/eco/submit/EcoSubmitterTest.java | 24 +++-
scripts/get_all_heron_paths.sh | 4 +-
scripts/travis/build.sh | 6 +-
third_party/cereal/BUILD | 126 +++++++++++++++++++++
third_party/cereal/cereal-1.2.1.tar.gz | Bin 0 -> 301689 bytes
third_party/cereal/empty.cc | 0
tools/rules/heron_examples.bzl | 6 +-
59 files changed, 530 insertions(+), 195 deletions(-)
diff --git a/eco-examples/src/java/BUILD b/eco-heron-examples/src/java/BUILD
similarity index 92%
copy from eco-examples/src/java/BUILD
copy to eco-heron-examples/src/java/BUILD
index 1ac3dbe..1051d32 100644
--- a/eco-examples/src/java/BUILD
+++ b/eco-heron-examples/src/java/BUILD
@@ -2,7 +2,7 @@ package(default_visibility = ["//visibility:public"])
filegroup(
name = "heron-eco-examples-support",
- srcs = glob(["**/*.yaml", "**/*.properties"]),
+ srcs = glob(["**/*.yaml"]),
)
java_binary(
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
similarity index 83%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
index 65487f2..bd60a49 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
@@ -16,15 +16,15 @@ package com.twitter.heron.examples.eco;
import java.util.Map;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.bolt.BasicOutputCollector;
+import com.twitter.heron.api.bolt.IBasicBolt;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
-import static org.apache.storm.utils.Utils.tuple;
+import static com.twitter.heron.api.utils.Utils.tuple;
@SuppressWarnings({"serial", "rawtypes", "unchecked"})
public class EvenAndOddBolt implements IBasicBolt {
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
similarity index 84%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
index 3271178..d7be2c6 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
@@ -16,10 +16,10 @@ package com.twitter.heron.examples.eco;
import java.util.logging.Logger;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
+import com.twitter.heron.api.bolt.BaseBasicBolt;
+import com.twitter.heron.api.bolt.BasicOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.tuple.Tuple;
/**
* Simple bolt that does nothing other than LOG.info() every tuple received.
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
similarity index 85%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
index f3f11e4..fe19768 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
@@ -17,13 +17,14 @@ import java.util.Map;
import java.util.Random;
import java.util.logging.Logger;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.utils.Utils;
@SuppressWarnings({"serial", "rawtypes", "HiddenField"})
public class TestFibonacciSpout extends BaseRichSpout {
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
similarity index 83%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
index 8d5d4c9..4f23f4d 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
@@ -15,14 +15,14 @@ package com.twitter.heron.examples.eco;
import java.util.Map;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
+import com.twitter.heron.api.bolt.BasicOutputCollector;
+import com.twitter.heron.api.bolt.IBasicBolt;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
-import static org.apache.storm.utils.Utils.tuple;
+import static com.twitter.heron.api.utils.Utils.tuple;
@SuppressWarnings({"serial", "rawtypes", "unchecked", "HiddenField"})
public class TestIBasicPrintBolt implements IBasicBolt {
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
similarity index 83%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
index 00b5f64..ba02368 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
@@ -16,12 +16,12 @@ package com.twitter.heron.examples.eco;
import java.util.HashMap;
import java.util.Map;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
+import com.twitter.heron.api.bolt.BaseBasicBolt;
+import com.twitter.heron.api.bolt.BasicOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
import static com.twitter.heron.api.utils.Utils.tuple;
@SuppressWarnings({"serial", "rawtypes"})
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
similarity index 81%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
index 4e7e8a5..355d9b1 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
@@ -17,14 +17,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-import org.apache.storm.Config;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.utils.Utils;
@SuppressWarnings({"serial", "HiddenField"})
public class TestNameSpout extends BaseRichSpout {
@@ -72,7 +72,7 @@ public class TestNameSpout extends BaseRichSpout {
public Map<String, Object> getComponentConfiguration() {
if (!isdistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
- ret.put(Config.TOPOLOGY_WORKERS, 1);
+ ret.put(Config.TOPOLOGY_STMGRS, 1);
return ret;
} else {
return null;
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
similarity index 81%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
index f9cb1d6..b48cf31 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
@@ -13,10 +13,10 @@
// limitations under the License.
package com.twitter.heron.examples.eco;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
+import com.twitter.heron.api.bolt.BaseBasicBolt;
+import com.twitter.heron.api.bolt.BasicOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.tuple.Tuple;
@SuppressWarnings("serial")
public class TestPrintBolt extends BaseBasicBolt {
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java
similarity index 100%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java
similarity index 100%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
similarity index 77%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
index ef6046c..34d5c76 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
@@ -15,13 +15,13 @@ package com.twitter.heron.examples.eco;
import java.util.Map;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.windowing.TupleWindow;
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.windowing.TupleWindow;
@SuppressWarnings({"serial", "HiddenField"})
public class TestWindowBolt extends BaseWindowedBolt {
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_fibonacci.yaml
similarity index 96%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_fibonacci.yaml
index 7ebb2d7..7a9f9a2 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_fibonacci.yaml
@@ -14,7 +14,8 @@
---
-name: "fibonacci-topology"
+name: "heron-fibonacci-topology"
+type: "heron"
config:
topology.workers: 1
@@ -71,4 +72,4 @@ streams:
to: "sys-out-bolt"
grouping:
type: SHUFFLE
- streamId: "evens"
\ No newline at end of file
+ streamId: "evens"
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_windowing.yaml
similarity index 87%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_windowing.yaml
index 777bfb6..49adf3c 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_windowing.yaml
@@ -14,15 +14,16 @@
---
-name: "sliding-window-topology"
+name: "heron-sliding-window-topology"
+type: "heron"
components:
- id: "windowLength"
- className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
+ className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count"
constructorArgs:
- 5
- id: "slidingInterval"
- className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
+ className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count"
constructorArgs:
- 3
@@ -62,4 +63,4 @@ streams:
- from: "bolt-1"
to: "bolt-2"
grouping:
- type: SHUFFLE
\ No newline at end of file
+ type: SHUFFLE
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_wordcount.yaml
similarity index 96%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_wordcount.yaml
index 6d18b73..a195699 100644
--- a/eco-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml
+++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron_wordcount.yaml
@@ -16,7 +16,8 @@
# topology definition
# name to be used when submitting
-name: "simple-wordcount-topology"
+name: "heron-simple-wordcount-topology"
+type: "heron"
# topology configuration
# this will be passed to the submitter as a map of config options
@@ -69,4 +70,4 @@ streams:
- from: "bolt-1"
to: "bolt-2"
grouping:
- type: SHUFFLE
\ No newline at end of file
+ type: SHUFFLE
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/sample.properties b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/sample.properties
similarity index 100%
copy from eco-examples/src/java/com/twitter/heron/examples/eco/sample.properties
copy to eco-heron-examples/src/java/com/twitter/heron/examples/eco/sample.properties
diff --git a/eco-examples/src/java/BUILD b/eco-storm-examples/src/java/BUILD
similarity index 69%
rename from eco-examples/src/java/BUILD
rename to eco-storm-examples/src/java/BUILD
index 1ac3dbe..6ef748d 100644
--- a/eco-examples/src/java/BUILD
+++ b/eco-storm-examples/src/java/BUILD
@@ -1,12 +1,12 @@
package(default_visibility = ["//visibility:public"])
filegroup(
- name = "heron-eco-examples-support",
+ name = "storm-eco-examples-support",
srcs = glob(["**/*.yaml", "**/*.properties"]),
)
java_binary(
- name='eco-examples-unshaded',
+ name='storm-eco-examples-unshaded',
srcs = glob(["com/twitter/heron/examples/eco/**/*.java"]),
deps = [
"//heron/api/src/java:api-java-low-level",
@@ -18,8 +18,8 @@ java_binary(
)
genrule(
- name = 'heron-eco-examples',
- srcs = [":eco-examples-unshaded_deploy.jar"],
- outs = ["heron-eco-examples.jar"],
+ name = 'storm-eco-examples',
+ srcs = [":storm-eco-examples-unshaded_deploy.jar"],
+ outs = ["storm-eco-examples.jar"],
cmd = "cp $< $@",
)
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/EvenAndOddBolt.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestFibonacciSpout.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestIBasicPrintBolt.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestPropertyHolder.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestUnits.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/fibonacci.yaml
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/sample.properties b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/sample.properties
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/sample.properties
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/sample.properties
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/simple_windowing.yaml
diff --git a/eco-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml b/eco-storm-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml
similarity index 100%
rename from eco-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml
rename to eco-storm-examples/src/java/com/twitter/heron/examples/eco/simple_wordcount.yaml
diff --git a/eco/src/java/BUILD b/eco/src/java/BUILD
index d5c81b9..158fe1a 100644
--- a/eco/src/java/BUILD
+++ b/eco/src/java/BUILD
@@ -12,7 +12,56 @@ eco_deps = [
]
java_library(
+ name = "eco-defs-java",
+ srcs = glob(["com/twitter/heron/eco/definition/*.java"]),
+ deps = eco_deps,
+)
+
+java_library(
+ name = "eco-parser-java",
+ srcs = glob(["com/twitter/heron/eco/parser/*.java"]),
+ deps = eco_deps + [":eco-defs-java"],
+)
+
+java_library(
+ name = "eco-builder-java",
+ srcs = glob(["com/twitter/heron/eco/builder/*.java"]),
+ deps = eco_deps + [":eco-defs-java"],
+)
+
+java_library(
+ name = "eco-storm-builder-java",
+ srcs = glob(["com/twitter/heron/eco/builder/storm/*.java"]),
+ deps = eco_deps + [
+ ":eco-builder-java",
+ ":eco-defs-java",
+ ],
+)
+
+java_library(
+ name = "eco-heron-builder-java",
+ srcs = glob(["com/twitter/heron/eco/builder/heron/*.java"]),
+ deps = eco_deps + [
+ ":eco-builder-java",
+ ":eco-defs-java",
+ ],
+)
+
+java_library(
+ name = "eco-submit-java",
+ srcs = glob(["com/twitter/heron/eco/submit/*.java"]),
+ deps = eco_deps,
+)
+
+java_library(
name = "eco-java",
- srcs = glob(["com/twitter/heron/eco/**/*.java"]),
- deps = eco_deps
+ srcs = glob(["com/twitter/heron/eco/*.java"]),
+ deps = eco_deps + [
+ ":eco-defs-java",
+ ":eco-parser-java",
+ ":eco-builder-java",
+ ":eco-heron-builder-java",
+ ":eco-storm-builder-java",
+ ":eco-submit-java",
+ ],
)
diff --git a/eco/src/java/com/twitter/heron/eco/Eco.java b/eco/src/java/com/twitter/heron/eco/Eco.java
index f5cb6bb..77b4dd8 100644
--- a/eco/src/java/com/twitter/heron/eco/Eco.java
+++ b/eco/src/java/com/twitter/heron/eco/Eco.java
@@ -15,6 +15,7 @@ package com.twitter.heron.eco;
import java.io.File;
import java.io.FileInputStream;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
@@ -23,17 +24,13 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.storm.topology.TopologyBuilder;
import com.twitter.heron.api.Config;
import com.twitter.heron.eco.builder.BoltBuilder;
import com.twitter.heron.eco.builder.BuilderUtility;
import com.twitter.heron.eco.builder.ComponentBuilder;
import com.twitter.heron.eco.builder.ConfigBuilder;
-import com.twitter.heron.eco.builder.EcoBuilder;
import com.twitter.heron.eco.builder.ObjectBuilder;
-import com.twitter.heron.eco.builder.SpoutBuilder;
-import com.twitter.heron.eco.builder.StreamBuilder;
import com.twitter.heron.eco.definition.BoltDefinition;
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
@@ -50,16 +47,22 @@ public class Eco {
private static final String ENV_PROPS = "env-props";
private static final String ECO_CONFIG_FILE = "eco-config-file";
- private EcoBuilder ecoBuilder;
private EcoParser ecoParser;
private EcoSubmitter ecoSubmitter;
- public Eco(EcoBuilder ecoBuilder, EcoParser ecoParser, EcoSubmitter ecoSubmitter) {
- this.ecoBuilder = ecoBuilder;
+ public Eco(EcoParser ecoParser, EcoSubmitter ecoSubmitter) {
this.ecoParser = ecoParser;
this.ecoSubmitter = ecoSubmitter;
}
+ /**
+ * Submit an ECO topology
+ *
+ * @param fileInputStream The input stream associated with ECO topology definition file
+ * @param propertiesFile The optional key-value property file for optional property substitution.
+ * @param envFilter The optional flag to tell ECO to perform environment variable substitution
+ * @throws Exception
+ */
public void submit(FileInputStream fileInputStream,
FileInputStream propertiesFile, boolean envFilter)
throws Exception {
@@ -67,21 +70,61 @@ public class Eco {
.parseFromInputStream(fileInputStream, propertiesFile, envFilter);
String topologyName = topologyDefinition.getName();
+ String topologyType = topologyDefinition.getType();
- Config topologyConfig = ecoBuilder
- .buildConfig(topologyDefinition);
+ if ("storm".equals(topologyType)) {
+ System.out.println("topology type is Storm");
+ com.twitter.heron.eco.builder.storm.EcoBuilder ecoBuilder =
+ new com.twitter.heron.eco.builder.storm.EcoBuilder(
+ new com.twitter.heron.eco.builder.storm.SpoutBuilder(),
+ new BoltBuilder(),
+ new com.twitter.heron.eco.builder.storm.StreamBuilder(),
+ new ComponentBuilder(),
+ new ConfigBuilder());
+
+ Config topologyConfig = ecoBuilder
+ .buildConfig(topologyDefinition);
+
+ EcoExecutionContext executionContext
+ = new EcoExecutionContext(topologyDefinition, topologyConfig);
- EcoExecutionContext executionContext
- = new EcoExecutionContext(topologyDefinition, topologyConfig);
+ printTopologyInfo(executionContext);
- printTopologyInfo(executionContext);
+ ObjectBuilder objectBuilder = new ObjectBuilder();
+ objectBuilder.setBuilderUtility(new BuilderUtility());
+
+ org.apache.storm.topology.TopologyBuilder builder = ecoBuilder
+ .buildTopologyBuilder(executionContext, objectBuilder);
+ ecoSubmitter.submitStormTopology(topologyName, topologyConfig, builder.createTopology());
+ } else if ("heron".equals(topologyType)) {
+ System.out.println("topology type is Heron");
+ com.twitter.heron.eco.builder.heron.EcoBuilder ecoBuilder =
+ new com.twitter.heron.eco.builder.heron.EcoBuilder(
+ new com.twitter.heron.eco.builder.heron.SpoutBuilder(),
+ new BoltBuilder(),
+ new com.twitter.heron.eco.builder.heron.StreamBuilder(),
+ new ComponentBuilder(),
+ new ConfigBuilder());
- ObjectBuilder objectBuilder = new ObjectBuilder();
- objectBuilder.setBuilderUtility(new BuilderUtility());
- TopologyBuilder builder = ecoBuilder
- .buildTopologyBuilder(executionContext, objectBuilder);
+ Config topologyConfig = ecoBuilder
+ .buildConfig(topologyDefinition);
- ecoSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
+ EcoExecutionContext executionContext
+ = new EcoExecutionContext(topologyDefinition, topologyConfig);
+
+ printTopologyInfo(executionContext);
+
+ ObjectBuilder objectBuilder = new ObjectBuilder();
+ objectBuilder.setBuilderUtility(new BuilderUtility());
+
+ com.twitter.heron.api.topology.TopologyBuilder builder = ecoBuilder
+ .buildTopologyBuilder(executionContext, objectBuilder);
+ ecoSubmitter.submitHeronTopology(topologyName, topologyConfig, builder.createTopology());
+ } else {
+ LOG.log(Level.SEVERE,
+ String.format("Unknown topology type \'%s\' for topology %s, not submitted",
+ topologyType, topologyName));
+ }
}
public static void main(String[] args) throws Exception {
@@ -109,15 +152,7 @@ public class Eco {
Boolean filterFromEnv = cmd.hasOption(ENV_PROPS);
- Eco eco = new Eco(
- new EcoBuilder(
- new SpoutBuilder(),
- new BoltBuilder(),
- new StreamBuilder(),
- new ComponentBuilder(),
- new ConfigBuilder()),
- new EcoParser(),
- new EcoSubmitter());
+ Eco eco = new Eco(new EcoParser(), new EcoSubmitter());
eco.submit(fin, propsInputStream, filterFromEnv);
}
diff --git a/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
index 6af6ac9..5259db5 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
@@ -21,7 +21,7 @@ import com.twitter.heron.eco.definition.ObjectDefinition;
public class BoltBuilder {
- protected void buildBolts(EcoExecutionContext executionContext,
+ public void buildBolts(EcoExecutionContext executionContext,
ObjectBuilder objectBuilder)
throws IllegalAccessException, InstantiationException, ClassNotFoundException,
NoSuchFieldException, InvocationTargetException {
diff --git a/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
index 9fe6023..3e7be34 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
@@ -20,7 +20,7 @@ import com.twitter.heron.eco.definition.BeanDefinition;
import com.twitter.heron.eco.definition.EcoExecutionContext;
public class ComponentBuilder {
- protected void buildComponents(EcoExecutionContext context, ObjectBuilder objectBuilder)
+ public void buildComponents(EcoExecutionContext context, ObjectBuilder objectBuilder)
throws ClassNotFoundException,
IllegalAccessException, InstantiationException,
NoSuchFieldException, InvocationTargetException {
diff --git a/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
index bc8dd43..787ecb0 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
@@ -44,7 +44,12 @@ public class ConfigBuilder {
private static final Integer MINIMUM_BYTES = 256000000;
private static final Integer MINIMUM_MB = 256;
- protected Config buildConfig(EcoTopologyDefinition topologyDefinition)
+ /**
+ * Build the config for a ECO topology definition
+ *
+ * @param topologyDefinition - ECO topology definition
+ */
+ public Config buildConfig(EcoTopologyDefinition topologyDefinition)
throws IllegalArgumentException {
Map<String, Object> configMap = topologyDefinition.getConfig();
diff --git a/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/heron/EcoBuilder.java
similarity index 89%
copy from eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
copy to eco/src/java/com/twitter/heron/eco/builder/heron/EcoBuilder.java
index 06f7a47..e21cde9 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/heron/EcoBuilder.java
@@ -11,15 +11,20 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.heron;
import java.lang.reflect.InvocationTargetException;
import java.util.logging.Logger;
-import org.apache.storm.topology.TopologyBuilder;
-
import com.twitter.heron.api.Config;
+import com.twitter.heron.api.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.builder.BoltBuilder;
+import com.twitter.heron.eco.builder.ComponentBuilder;
+import com.twitter.heron.eco.builder.ConfigBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
diff --git a/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/heron/SpoutBuilder.java
similarity index 88%
copy from eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
copy to eco/src/java/com/twitter/heron/eco/builder/heron/SpoutBuilder.java
index 19dbfc9..2289a56 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/heron/SpoutBuilder.java
@@ -11,12 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.heron;
import java.lang.reflect.InvocationTargetException;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.TopologyBuilder;
+import com.twitter.heron.api.spout.IRichSpout;
+import com.twitter.heron.api.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.builder.ObjectBuilder;
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
diff --git a/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/heron/StreamBuilder.java
similarity index 90%
copy from eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
copy to eco/src/java/com/twitter/heron/eco/builder/heron/StreamBuilder.java
index 3ff50b3..f488523 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/heron/StreamBuilder.java
@@ -11,21 +11,23 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.heron;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.IWindowedBolt;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
+import com.twitter.heron.api.bolt.IBasicBolt;
+import com.twitter.heron.api.bolt.IRichBolt;
+import com.twitter.heron.api.bolt.IWindowedBolt;
+import com.twitter.heron.api.grouping.CustomStreamGrouping;
+import com.twitter.heron.api.topology.BoltDeclarer;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.utils.Utils;
+
+import com.twitter.heron.eco.builder.ObjectBuilder;
import com.twitter.heron.eco.definition.ComponentStream;
import com.twitter.heron.eco.definition.EcoExecutionContext;
diff --git a/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/storm/EcoBuilder.java
similarity index 91%
rename from eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
rename to eco/src/java/com/twitter/heron/eco/builder/storm/EcoBuilder.java
index 06f7a47..4900e73 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/storm/EcoBuilder.java
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.storm;
import java.lang.reflect.InvocationTargetException;
@@ -20,6 +20,11 @@ import java.util.logging.Logger;
import org.apache.storm.topology.TopologyBuilder;
import com.twitter.heron.api.Config;
+import com.twitter.heron.eco.builder.BoltBuilder;
+import com.twitter.heron.eco.builder.ComponentBuilder;
+import com.twitter.heron.eco.builder.ConfigBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
diff --git a/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/storm/SpoutBuilder.java
similarity index 94%
rename from eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
rename to eco/src/java/com/twitter/heron/eco/builder/storm/SpoutBuilder.java
index 19dbfc9..775d30d 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/storm/SpoutBuilder.java
@@ -11,13 +11,15 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.storm;
import java.lang.reflect.InvocationTargetException;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
import com.twitter.heron.eco.definition.ObjectDefinition;
diff --git a/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java b/eco/src/java/com/twitter/heron/eco/builder/storm/StreamBuilder.java
similarity index 98%
rename from eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
rename to eco/src/java/com/twitter/heron/eco/builder/storm/StreamBuilder.java
index 3ff50b3..6777890 100644
--- a/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
+++ b/eco/src/java/com/twitter/heron/eco/builder/storm/StreamBuilder.java
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.storm;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
@@ -27,6 +27,8 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.ComponentStream;
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
diff --git a/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java b/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
index b5b8ad9..132bb0d 100644
--- a/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
+++ b/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
@@ -22,6 +22,7 @@ import java.util.Map;
public class EcoTopologyDefinition {
private String name;
+ private String type;
private Map<String, Object> config = new HashMap<>();
private Map<String, SpoutDefinition> spouts = new LinkedHashMap<>();
private Map<String, BoltDefinition> bolts = new LinkedHashMap<>();
@@ -94,10 +95,25 @@ public class EcoTopologyDefinition {
}
public String getName() {
-
return name;
}
+ public String getType() {
+ if (type == null || "storm".equals(type)) {
+ return "storm";
+ }
+
+ if ("heron".equals(type)) {
+ return "heron";
+ }
+
+ return null;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
public void setName(String name) {
this.name = name;
}
diff --git a/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java b/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
index 07d65e5..8383bc4 100644
--- a/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
+++ b/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
@@ -13,17 +13,27 @@
// limitations under the License.
package com.twitter.heron.eco.submit;
+
import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import com.twitter.heron.api.Config;
+import com.twitter.heron.api.HeronSubmitter;
+import com.twitter.heron.api.HeronTopology;
public class EcoSubmitter {
- public void submitTopology(String topologyName, Config topologyConfig, StormTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
+ public void submitStormTopology(String topologyName,
+ Config topologyConfig, StormTopology topology)
+ throws org.apache.storm.generated.AlreadyAliveException,
+ org.apache.storm.generated.InvalidTopologyException {
StormSubmitter.submitTopology(topologyName, topologyConfig, topology);
}
+
+ public void submitHeronTopology(String topologyName,
+ Config topologyConfig, HeronTopology topology)
+ throws com.twitter.heron.api.exception.AlreadyAliveException,
+ com.twitter.heron.api.exception.InvalidTopologyException {
+ HeronSubmitter.submitTopology(topologyName, topologyConfig, topology);
+ }
}
diff --git a/eco/tests/java/BUILD b/eco/tests/java/BUILD
index efcc0bc..388df82 100644
--- a/eco/tests/java/BUILD
+++ b/eco/tests/java/BUILD
@@ -7,6 +7,12 @@ test_deps_files = [
heron_local_deps = [
"//eco/src/java:eco-java",
+ "//eco/src/java:eco-submit-java",
+ "//eco/src/java:eco-builder-java",
+ "//eco/src/java:eco-defs-java",
+ "//eco/src/java:eco-parser-java",
+ "//eco/src/java:eco-storm-builder-java",
+ "//eco/src/java:eco-heron-builder-java",
"//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
]
@@ -14,8 +20,15 @@ heron_local_deps = [
eco_test_deps = heron_local_deps + test_deps_files
java_test(
- name = "EcoBuilderTest",
- srcs = glob(["com/twitter/heron/eco/builder/EcoBuilderTest.java"]),
+ name = "StormEcoBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/storm/StormEcoBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small",
+)
+
+java_test(
+ name = "HeronEcoBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/heron/HeronEcoBuilderTest.java"]),
deps = eco_test_deps,
size = "small",
)
@@ -49,15 +62,29 @@ java_test(
)
java_test(
- name = "SpoutBuilderTest",
- srcs = glob(["com/twitter/heron/eco/builder/SpoutBuilderTest.java"]),
+ name = "StormSpoutBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/storm/StormSpoutBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "HeronSpoutBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/heron/HeronSpoutBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "StormStreamBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/storm/StormStreamBuilderTest.java"]),
deps = eco_test_deps,
size = "small"
)
java_test(
- name = "StreamBuilderTest",
- srcs = glob(["com/twitter/heron/eco/builder/StreamBuilderTest.java"]),
+ name = "HeronStreamBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/heron/HeronStreamBuilderTest.java"]),
deps = eco_test_deps,
size = "small"
)
diff --git a/eco/tests/java/com/twitter/heron/eco/EcoTest.java b/eco/tests/java/com/twitter/heron/eco/EcoTest.java
index cd08281..35c9006 100644
--- a/eco/tests/java/com/twitter/heron/eco/EcoTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/EcoTest.java
@@ -16,7 +16,6 @@ package com.twitter.heron.eco;
import java.io.FileInputStream;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -27,9 +26,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import com.twitter.heron.api.Config;
-import com.twitter.heron.eco.builder.EcoBuilder;
-import com.twitter.heron.eco.builder.ObjectBuilder;
-import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.api.HeronTopology;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
import com.twitter.heron.eco.parser.EcoParser;
import com.twitter.heron.eco.submit.EcoSubmitter;
@@ -44,49 +41,58 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class EcoTest {
@Mock
- private EcoBuilder mockEcoBuilder;
- @Mock
private EcoParser mockEcoParser;
@Mock
- private TopologyBuilder mockTopologyBuilder;
- @Mock
private EcoSubmitter mockEcoSubmitter;
@InjectMocks
private Eco subject;
@After
public void ensureNoUnexpectedMockInteractions() {
- Mockito.verifyNoMoreInteractions(mockEcoBuilder,
- mockEcoParser,
- mockTopologyBuilder,
+ Mockito.verifyNoMoreInteractions(mockEcoParser,
mockEcoSubmitter);
}
@Test
- public void testSubmit_AllGood_BehavesAsExpected() throws Exception {
+ public void testSubmit_StormTopologyType_BehavesAsExpected() throws Exception {
FileInputStream mockStream = PowerMockito.mock(FileInputStream.class);
FileInputStream mockPropsStream = PowerMockito.mock(FileInputStream.class);
final String topologyName = "the name";
EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition();
topologyDefinition.setName(topologyName);
- Config config = new Config();
when(mockEcoParser.parseFromInputStream(eq(mockStream), eq(mockPropsStream), eq(false)))
.thenReturn(topologyDefinition);
- when(mockEcoBuilder.buildConfig(eq(topologyDefinition))).thenReturn(config);
- when(mockEcoBuilder.buildTopologyBuilder(any(EcoExecutionContext.class),
- any(ObjectBuilder.class))).thenReturn(mockTopologyBuilder);
subject.submit(mockStream, mockPropsStream, false);
verify(mockEcoParser).parseFromInputStream(same(mockStream),
same(mockPropsStream), eq(false));
- verify(mockEcoBuilder).buildConfig(same(topologyDefinition));
- verify(mockEcoBuilder).buildTopologyBuilder(any(EcoExecutionContext.class),
- any(ObjectBuilder.class));
- verify(mockTopologyBuilder).createTopology();
- verify(mockEcoSubmitter).submitTopology(any(String.class), any(Config.class),
+
+ verify(mockEcoSubmitter).submitStormTopology(any(String.class), any(Config.class),
any(StormTopology.class));
}
+
+ @Test
+ public void testSubmit_HeronTopologyType_BehavesAsExpected() throws Exception {
+ FileInputStream mockStream = PowerMockito.mock(FileInputStream.class);
+ FileInputStream mockPropsStream = PowerMockito.mock(FileInputStream.class);
+
+ final String topologyName = "the name";
+ EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition();
+ topologyDefinition.setName(topologyName);
+ topologyDefinition.setType("heron");
+
+ when(mockEcoParser.parseFromInputStream(eq(mockStream), eq(mockPropsStream), eq(false)))
+ .thenReturn(topologyDefinition);
+
+ subject.submit(mockStream, mockPropsStream, false);
+
+ verify(mockEcoParser).parseFromInputStream(same(mockStream),
+ same(mockPropsStream), eq(false));
+
+ verify(mockEcoSubmitter).submitHeronTopology(any(String.class), any(Config.class),
+ any(HeronTopology.class));
+ }
}
diff --git a/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java b/eco/tests/java/com/twitter/heron/eco/builder/heron/HeronEcoBuilderTest.java
similarity index 92%
copy from eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
copy to eco/tests/java/com/twitter/heron/eco/builder/heron/HeronEcoBuilderTest.java
index d43ecbc..b360a35 100644
--- a/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/builder/heron/HeronEcoBuilderTest.java
@@ -11,13 +11,12 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.heron;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.storm.topology.TopologyBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -27,6 +26,13 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.twitter.heron.api.Config;
+import com.twitter.heron.api.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.builder.BoltBuilder;
+import com.twitter.heron.eco.builder.ComponentBuilder;
+import com.twitter.heron.eco.builder.ConfigBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
@@ -42,7 +48,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class EcoBuilderTest {
+public class HeronEcoBuilderTest {
@Mock
private SpoutBuilder mockSpoutBuilder;
diff --git a/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java b/eco/tests/java/com/twitter/heron/eco/builder/heron/HeronSpoutBuilderTest.java
similarity index 91%
copy from eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
copy to eco/tests/java/com/twitter/heron/eco/builder/heron/HeronSpoutBuilderTest.java
index 12abe93..bc57eae 100644
--- a/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/builder/heron/HeronSpoutBuilderTest.java
@@ -11,18 +11,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.heron;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -31,6 +26,13 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import com.twitter.heron.api.spout.IRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.topology.TopologyContext;
+
+import com.twitter.heron.eco.builder.ObjectBuilder;
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
import com.twitter.heron.eco.definition.SpoutDefinition;
@@ -42,7 +44,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class SpoutBuilderTest {
+public class HeronSpoutBuilderTest {
@Mock
private EcoExecutionContext mockContext;
diff --git a/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java b/eco/tests/java/com/twitter/heron/eco/builder/heron/HeronStreamBuilderTest.java
similarity index 91%
copy from eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
copy to eco/tests/java/com/twitter/heron/eco/builder/heron/HeronStreamBuilderTest.java
index 9c9b868..1ac6691 100644
--- a/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/builder/heron/HeronStreamBuilderTest.java
@@ -11,29 +11,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.heron;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.task.WorkerTopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.IWindowedBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.windowing.TimestampExtractor;
-import org.apache.storm.windowing.TupleWindow;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +26,22 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import com.twitter.heron.api.bolt.BasicOutputCollector;
+import com.twitter.heron.api.bolt.IBasicBolt;
+import com.twitter.heron.api.bolt.IRichBolt;
+import com.twitter.heron.api.bolt.IWindowedBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.grouping.CustomStreamGrouping;
+import com.twitter.heron.api.topology.BoltDeclarer;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.windowing.TimestampExtractor;
+import com.twitter.heron.api.windowing.TupleWindow;
+
+import com.twitter.heron.eco.builder.ObjectBuilder;
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
import com.twitter.heron.eco.definition.GroupingDefinition;
@@ -56,7 +56,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class StreamBuilderTest {
+public class HeronStreamBuilderTest {
@Mock
private EcoTopologyDefinition mockDefinition;
@@ -265,13 +265,13 @@ public class StreamBuilderTest {
private class MockCustomStreamGrouping implements CustomStreamGrouping {
@Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
- List<Integer> targetTasks) {
+ public void prepare(TopologyContext context, String component,
+ String streamId, List<Integer> targetTasks) {
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ public List<Integer> chooseTasks(List<Object> values) {
return null;
}
}
@@ -280,7 +280,7 @@ public class StreamBuilderTest {
private class MockIRichBolt implements IRichBolt {
@Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ public void prepare(Map heronConf, TopologyContext context, OutputCollector collector) {
}
@@ -343,7 +343,7 @@ public class StreamBuilderTest {
@SuppressWarnings({"rawtypes", "unchecked", "serial"})
public class MockIBasicBolt implements IBasicBolt {
@Override
- public void prepare(Map stormConf, TopologyContext context) {
+ public void prepare(Map heronConf, TopologyContext context) {
}
diff --git a/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java b/eco/tests/java/com/twitter/heron/eco/builder/storm/StormEcoBuilderTest.java
similarity index 93%
rename from eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
rename to eco/tests/java/com/twitter/heron/eco/builder/storm/StormEcoBuilderTest.java
index d43ecbc..0fffb6d 100644
--- a/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/builder/storm/StormEcoBuilderTest.java
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.storm;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
@@ -27,6 +27,11 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.twitter.heron.api.Config;
+import com.twitter.heron.eco.builder.BoltBuilder;
+import com.twitter.heron.eco.builder.ComponentBuilder;
+import com.twitter.heron.eco.builder.ConfigBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
@@ -42,7 +47,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class EcoBuilderTest {
+public class StormEcoBuilderTest {
@Mock
private SpoutBuilder mockSpoutBuilder;
diff --git a/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java b/eco/tests/java/com/twitter/heron/eco/builder/storm/StormSpoutBuilderTest.java
similarity index 97%
rename from eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
rename to eco/tests/java/com/twitter/heron/eco/builder/storm/StormSpoutBuilderTest.java
index 12abe93..e659e16 100644
--- a/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/builder/storm/StormSpoutBuilderTest.java
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.storm;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@@ -31,6 +31,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
import com.twitter.heron.eco.definition.SpoutDefinition;
@@ -42,7 +44,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class SpoutBuilderTest {
+public class StormSpoutBuilderTest {
@Mock
private EcoExecutionContext mockContext;
diff --git a/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java b/eco/tests/java/com/twitter/heron/eco/builder/storm/StormStreamBuilderTest.java
similarity index 98%
rename from eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
rename to eco/tests/java/com/twitter/heron/eco/builder/storm/StormStreamBuilderTest.java
index 9c9b868..abfe236 100644
--- a/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/builder/storm/StormStreamBuilderTest.java
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.eco.builder;
+package com.twitter.heron.eco.builder.storm;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@@ -42,6 +42,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+
import com.twitter.heron.eco.definition.EcoExecutionContext;
import com.twitter.heron.eco.definition.EcoTopologyDefinition;
import com.twitter.heron.eco.definition.GroupingDefinition;
@@ -56,7 +58,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class StreamBuilderTest {
+public class StormStreamBuilderTest {
@Mock
private EcoTopologyDefinition mockDefinition;
diff --git a/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java b/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
index 5c5a756..d027c12 100644
--- a/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
@@ -172,6 +172,7 @@ public class EcoParserTest {
+ "---\n"
+ "\n"
+ "name: \"kafka-topology\"\n"
+ + "type: \"heron\"\n"
+ "\n"
+ "# Components\n"
+ "# Components are analagous to Spring beans. They are meant to be used as constructor,\n"
@@ -349,6 +350,7 @@ public class EcoParserTest {
List<BeanDefinition> components = topologyDefinition.getComponents();
assertEquals("kafka-topology", topologyDefinition.getName());
+ assertEquals("heron", topologyDefinition.getType());
assertEquals(4, components.size());
BeanDefinition stringSchemeComponent = components.get(0);
diff --git a/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java b/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
index 7929a94..25deb72 100644
--- a/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
+++ b/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
@@ -15,6 +15,7 @@ package com.twitter.heron.eco.submit;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -23,13 +24,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.twitter.heron.api.Config;
+import com.twitter.heron.api.HeronSubmitter;
+import com.twitter.heron.api.HeronTopology;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.internal.verification.VerificationModeFactory.times;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormSubmitter.class)
+@PrepareForTest({StormSubmitter.class, HeronSubmitter.class})
public class EcoSubmitterTest {
private EcoSubmitter subject;
@@ -40,7 +43,7 @@ public class EcoSubmitterTest {
}
@Test
- public void submitTopology_AllGood_BehavesAsExpected()
+ public void submitStormTopology_AllGood_BehavesAsExpected()
throws Exception {
Config config = new Config();
StormTopology topology = new StormTopology();
@@ -48,9 +51,24 @@ public class EcoSubmitterTest {
PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",
any(String.class), any(Config.class), any(StormTopology.class));
- subject.submitTopology("name", config, topology);
+ subject.submitStormTopology("name", config, topology);
PowerMockito.verifyStatic(times(1));
StormSubmitter.submitTopology(anyString(), any(Config.class), any(StormTopology.class));
}
+
+ @Test
+ public void submitHeronTopology_AllGood_BehavesAsExpected()
+ throws Exception {
+ Config config = new Config();
+ HeronTopology topology = new HeronTopology(null);
+ PowerMockito.spy(HeronSubmitter.class);
+ PowerMockito.doNothing().when(HeronSubmitter.class, "submitTopology",
+ any(String.class), any(Config.class), any(HeronTopology.class));
+
+ subject.submitHeronTopology("name", config, topology);
+ PowerMockito.verifyStatic(times(1));
+ HeronSubmitter.submitTopology(anyString(), any(Config.class), any(HeronTopology.class));
+
+ }
}
diff --git a/scripts/get_all_heron_paths.sh b/scripts/get_all_heron_paths.sh
index 217f07b..8df9b52 100755
--- a/scripts/get_all_heron_paths.sh
+++ b/scripts/get_all_heron_paths.sh
@@ -25,7 +25,7 @@ set +e
# Build everything
DIR=`dirname $0`
source ${DIR}/detect_os_type.sh
-bazel build --config=`platform` {heron,integration_test,tools/java,examples,heronpy,storm-compatibility,storm-compatibility-examples,eco,eco-examples}/...
+bazel build --config=`platform` {heron,integration_test,tools/java,examples,heronpy,storm-compatibility,storm-compatibility-examples,eco,eco-storm-examples,eco-heron-examples}/...
result=$?
if [ "${result}" -eq "0" ] ; then
echo "Bazel build successful!!"
@@ -67,7 +67,7 @@ function get_package_of() {
}
function get_heron_java_paths() {
- local java_paths=$(find {heron,heron/tools,tools,integration_test,storm-compatibility,contrib,eco,eco-examples} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
+ local java_paths=$(find {heron,heron/tools,tools,integration_test,storm-compatibility,contrib,eco,eco-storm-examples,eco-heron-examples} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
if [ "$(uname -s | tr 'A-Z' 'a-z')" != "darwin" ]; then
java_paths=$(echo "${java_paths}" | fgrep -v "/objc_tools/")
fi
diff --git a/scripts/travis/build.sh b/scripts/travis/build.sh
index e4739f8..e39d18d 100755
--- a/scripts/travis/build.sh
+++ b/scripts/travis/build.sh
@@ -66,7 +66,7 @@ start_timer "$T"
python ${UTILS}/save-logs.py "heron_build.txt" bazel\
--bazelrc=tools/travis/bazel.rc build --config=$PLATFORM heron/... \
heronpy/... examples/... storm-compatibility-examples/... \
- eco-examples/...
+ eco-storm-examples/... eco-heron-examples/...
end_timer "$T"
# run heron unit tests
@@ -77,7 +77,7 @@ python ${UTILS}/save-logs.py "heron_test_non_flaky.txt" bazel\
--test_summary=detailed --test_output=errors\
--config=$PLATFORM --test_tag_filters=-flaky heron/... \
heronpy/... examples/... storm-compatibility-examples/... \
- eco-examples/...
+ eco-storm-examples/... eco-heron-examples/...
end_timer "$T"
# flaky tests are often due to test port race conditions,
@@ -89,7 +89,7 @@ python ${UTILS}/save-logs.py "heron_test_flaky.txt" bazel\
--test_summary=detailed --test_output=errors\
--config=$PLATFORM --test_tag_filters=flaky --jobs=0 heron/... \
heronpy/... examples/... storm-compatibility-examples/... \
- eco-examples/...
+ eco-storm-examples/... eco-heron-examples/...
end_timer "$T"
# build packages
diff --git a/third_party/cereal/BUILD b/third_party/cereal/BUILD
new file mode 100644
index 0000000..3f7e5ec
--- /dev/null
+++ b/third_party/cereal/BUILD
@@ -0,0 +1,126 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+package_name = "cereal"
+package_version = "1.2.1"
+
+package_file = package_name + "-" + package_version + ".tar.gz"
+package_dir = package_name + "-" + package_version
+
+file_list = [
+ "include/cereal/access.hpp",
+ "include/cereal/archives/adapters.hpp",
+ "include/cereal/archives/binary.hpp",
+ "include/cereal/archives/json.hpp",
+ "include/cereal/archives/portable_binary.hpp",
+ "include/cereal/archives/xml.hpp",
+ "include/cereal/cereal.hpp",
+ "include/cereal/details/helpers.hpp",
+ "include/cereal/details/polymorphic_impl.hpp",
+ "include/cereal/details/polymorphic_impl_fwd.hpp",
+ "include/cereal/details/static_object.hpp",
+ "include/cereal/details/traits.hpp",
+ "include/cereal/details/util.hpp",
+ "include/cereal/external/base64.hpp",
+ "include/cereal/external/rapidjson/allocators.h",
+ "include/cereal/external/rapidjson/document.h",
+ "include/cereal/external/rapidjson/encodedstream.h",
+ "include/cereal/external/rapidjson/encodings.h",
+ "include/cereal/external/rapidjson/error/en.h",
+ "include/cereal/external/rapidjson/error/error.h",
+ "include/cereal/external/rapidjson/filereadstream.h",
+ "include/cereal/external/rapidjson/filewritestream.h",
+ "include/cereal/external/rapidjson/fwd.h",
+ "include/cereal/external/rapidjson/internal/biginteger.h",
+ "include/cereal/external/rapidjson/internal/diyfp.h",
+ "include/cereal/external/rapidjson/internal/dtoa.h",
+ "include/cereal/external/rapidjson/internal/ieee754.h",
+ "include/cereal/external/rapidjson/internal/itoa.h",
+ "include/cereal/external/rapidjson/internal/meta.h",
+ "include/cereal/external/rapidjson/internal/pow10.h",
+ "include/cereal/external/rapidjson/internal/regex.h",
+ "include/cereal/external/rapidjson/internal/stack.h",
+ "include/cereal/external/rapidjson/internal/strfunc.h",
+ "include/cereal/external/rapidjson/internal/strtod.h",
+ "include/cereal/external/rapidjson/internal/swap.h",
+ "include/cereal/external/rapidjson/istreamwrapper.h",
+ "include/cereal/external/rapidjson/memorybuffer.h",
+ "include/cereal/external/rapidjson/memorystream.h",
+ "include/cereal/external/rapidjson/msinttypes/inttypes.h",
+ "include/cereal/external/rapidjson/msinttypes/stdint.h",
+ "include/cereal/external/rapidjson/ostreamwrapper.h",
+ "include/cereal/external/rapidjson/pointer.h",
+ "include/cereal/external/rapidjson/prettywriter.h",
+ "include/cereal/external/rapidjson/rapidjson.h",
+ "include/cereal/external/rapidjson/reader.h",
+ "include/cereal/external/rapidjson/schema.h",
+ "include/cereal/external/rapidjson/stream.h",
+ "include/cereal/external/rapidjson/stringbuffer.h",
+ "include/cereal/external/rapidjson/writer.h",
+ "include/cereal/external/rapidxml/rapidxml.hpp",
+ "include/cereal/external/rapidxml/rapidxml_iterators.hpp",
+ "include/cereal/external/rapidxml/rapidxml_print.hpp",
+ "include/cereal/external/rapidxml/rapidxml_utils.hpp",
+ "include/cereal/macros.hpp",
+ "include/cereal/types/array.hpp",
+ "include/cereal/types/base_class.hpp",
+ "include/cereal/types/bitset.hpp",
+ "include/cereal/types/boost_variant.hpp",
+ "include/cereal/types/chrono.hpp",
+ "include/cereal/types/common.hpp",
+ "include/cereal/types/complex.hpp",
+ "include/cereal/types/concepts/pair_associative_container.hpp",
+ "include/cereal/types/deque.hpp",
+ "include/cereal/types/forward_list.hpp",
+ "include/cereal/types/functional.hpp",
+ "include/cereal/types/list.hpp",
+ "include/cereal/types/map.hpp",
+ "include/cereal/types/memory.hpp",
+ "include/cereal/types/polymorphic.hpp",
+ "include/cereal/types/queue.hpp",
+ "include/cereal/types/set.hpp",
+ "include/cereal/types/stack.hpp",
+ "include/cereal/types/string.hpp",
+ "include/cereal/types/tuple.hpp",
+ "include/cereal/types/unordered_map.hpp",
+ "include/cereal/types/unordered_set.hpp",
+ "include/cereal/types/utility.hpp",
+ "include/cereal/types/valarray.hpp",
+ "include/cereal/types/vector.hpp",
+]
+
+genrule(
+ name = "cereal-srcs",
+ srcs = [
+ package_file,
+ ],
+ outs = file_list,
+ cmd = "\n".join([
+ "export WORKSPACE_ROOT=$$(pwd)",
+ "export INSTALL_DIR=$$(pwd)/$(@D)",
+ "export TMP_DIR=$$(mktemp -d -t cereal.XXXXX)",
+ "mkdir -p $$TMP_DIR",
+ "cp -R $(SRCS) $$TMP_DIR",
+ "cd $$TMP_DIR",
+ "tar xfz " + package_file,
+ "cd " + package_dir,
+ "$$WORKSPACE_ROOT/$(location //scripts/compile:env_exec) cmake -Wno-dev -DCMAKE_INSTALL_PREFIX:PATH=$$INSTALL_DIR .",
+ "$$WORKSPACE_ROOT/$(location //scripts/compile:env_exec) make install",
+ "rm -rf $$TMP_DIR",
+ ]),
+ tools = [
+ "//scripts/compile:env_exec",
+ ],
+)
+
+cc_library(
+ name = "cereal-cxx",
+ srcs = [
+ "empty.cc",
+ ] + file_list,
+ includes = [
+ "include",
+ ],
+ linkstatic = 1,
+)
diff --git a/third_party/cereal/cereal-1.2.1.tar.gz b/third_party/cereal/cereal-1.2.1.tar.gz
new file mode 100644
index 0000000..7f469ee
Binary files /dev/null and b/third_party/cereal/cereal-1.2.1.tar.gz differ
diff --git a/third_party/cereal/empty.cc b/third_party/cereal/empty.cc
new file mode 100644
index 0000000..e69de29
diff --git a/tools/rules/heron_examples.bzl b/tools/rules/heron_examples.bzl
index 2a411f3..a6c069e 100644
--- a/tools/rules/heron_examples.bzl
+++ b/tools/rules/heron_examples.bzl
@@ -15,7 +15,8 @@ def heron_examples_conf_files():
def heron_examples_yaml_files():
return [
- "//eco-examples/src/java:heron-eco-examples-support",
+ "//eco-storm-examples/src/java:storm-eco-examples-support",
+ "//eco-heron-examples/src/java:heron-eco-examples-support",
]
def heron_examples_lib_files():
@@ -23,5 +24,6 @@ def heron_examples_lib_files():
"//examples/src/java:heron-api-examples",
"//examples/src/java:heron-streamlet-examples",
"//examples/src/scala:heron-streamlet-scala-examples",
- "//eco-examples/src/java:heron-eco-examples",
+ "//eco-storm-examples/src/java:storm-eco-examples",
+ "//eco-heron-examples/src/java:heron-eco-examples",
]
--
To stop receiving notification emails like this one, please contact
karthikz@apache.org.