You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:19 UTC

[19/53] [abbrv] beam git commit: jstorm-runner: remove AdaptorBasicBolt and AdaptorBasicSpout.

jstorm-runner: remove AdaptorBasicBolt and AdaptorBasicSpout.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9309ac49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9309ac49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9309ac49

Branch: refs/heads/jstorm-runner
Commit: 9309ac49d81e1d6dfd694ec885cdb12a3db53483
Parents: 5a15d54
Author: Pei He <pe...@apache.org>
Authored: Fri Jul 14 14:50:47 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunner.java       | 16 +++---------
 .../jstorm/translation/TranslationContext.java  | 12 ++++-----
 .../translation/runtime/AdaptorBasicBolt.java   | 27 --------------------
 .../translation/runtime/AdaptorBasicSpout.java  | 27 --------------------
 .../translation/runtime/ExecutorsBolt.java      |  3 ++-
 .../runtime/UnboundedSourceSpout.java           |  3 ++-
 6 files changed, 14 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 00ec7f6..8782130 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -41,8 +41,6 @@ import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSeria
 import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent;
-import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
 import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
 import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt;
 import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout;
@@ -155,18 +153,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
 
   private AbstractComponent getComponent(
       String id, TranslationContext.ExecutionGraphContext context) {
-    AbstractComponent component = null;
-    AdaptorBasicSpout spout = context.getSpout(id);
+    AbstractComponent spout = context.getSpout(id);
     if (spout != null) {
-      component = spout;
+      return spout;
     } else {
-      AdaptorBasicBolt bolt = context.getBolt(id);
-      if (bolt != null) {
-        component = bolt;
-      }
+      return context.getBolt(id);
     }
-
-    return component;
   }
 
   private StormTopology getTopology(
@@ -176,7 +168,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
         isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
 
     int parallelismNumber = options.getParallelismNumber();
-    Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
+    Map<String, UnboundedSourceSpout> spouts = context.getSpouts();
     for (String id : spouts.keySet()) {
       IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
       builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));

http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 1230a31..28d102d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -34,9 +34,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
 import org.apache.beam.runners.jstorm.translation.runtime.Executor;
 import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
 import org.apache.beam.runners.jstorm.translation.translator.Stream;
 import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
 import org.apache.beam.runners.jstorm.util.RunnerUtils;
@@ -333,7 +333,7 @@ public class TranslationContext {
    */
   public static class ExecutionGraphContext {
 
-    private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
+    private final Map<String, UnboundedSourceSpout> spoutMap = new HashMap<>();
     private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
 
     // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
@@ -344,7 +344,7 @@ public class TranslationContext {
 
     private int id = 1;
 
-    public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
+    public void registerSpout(UnboundedSourceSpout spout, TaggedPValue output) {
       checkNotNull(spout, "spout");
       checkNotNull(output, "output");
       String name = "spout" + genId();
@@ -354,14 +354,14 @@ public class TranslationContext {
           Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
     }
 
-    public AdaptorBasicSpout getSpout(String id) {
+    public UnboundedSourceSpout getSpout(String id) {
       if (Strings.isNullOrEmpty(id)) {
         return null;
       }
       return this.spoutMap.get(id);
     }
 
-    public Map<String, AdaptorBasicSpout> getSpouts() {
+    public Map<String, UnboundedSourceSpout> getSpouts() {
       return this.spoutMap;
     }
 
@@ -418,7 +418,7 @@ public class TranslationContext {
     public String toString() {
       List<String> ret = new ArrayList<>();
       ret.add("SPOUT");
-      for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
+      for (Map.Entry<String, UnboundedSourceSpout> entry : spoutMap.entrySet()) {
         ret.add(entry.getKey() + ": " + entry.getValue().toString());
       }
       ret.add("BOLT");

http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
deleted file mode 100644
index d8d4d46..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.topology.IRichBatchBolt;
-
-/**
- * Adaptor bolt of JStorm extends {@link AbstractComponent}.
- */
-public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
deleted file mode 100644
index 814d416..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * Adaptor bolt of JStorm extends {@link AbstractComponent}.
- */
-public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
index d33c17a..0366c13 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBatchBolt;
 import backtype.storm.tuple.ITupleExt;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
 /**
  * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
  */
-public class ExecutorsBolt extends AdaptorBasicBolt {
+public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
   private static final long serialVersionUID = -7751043327801735211L;
 
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
index 006cd47..690824d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Values;
 import com.alibaba.jstorm.utils.KryoSerializer;
 import java.io.IOException;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * Spout implementation that wraps a Beam UnboundedSource.
  * TODO: add wrapper to support metrics in UnboundedSource.
  */
-public class UnboundedSourceSpout extends AdaptorBasicSpout {
+public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
 
   private final String description;