You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:19 UTC
[19/53] [abbrv] beam git commit: jstorm-runner: remove
AdaptorBasicBolt and AdaptorBasicSpout.
jstorm-runner: remove AdaptorBasicBolt and AdaptorBasicSpout.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9309ac49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9309ac49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9309ac49
Branch: refs/heads/jstorm-runner
Commit: 9309ac49d81e1d6dfd694ec885cdb12a3db53483
Parents: 5a15d54
Author: Pei He <pe...@apache.org>
Authored: Fri Jul 14 14:50:47 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/JStormRunner.java | 16 +++---------
.../jstorm/translation/TranslationContext.java | 12 ++++-----
.../translation/runtime/AdaptorBasicBolt.java | 27 --------------------
.../translation/runtime/AdaptorBasicSpout.java | 27 --------------------
.../translation/runtime/ExecutorsBolt.java | 3 ++-
.../runtime/UnboundedSourceSpout.java | 3 ++-
6 files changed, 14 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 00ec7f6..8782130 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -41,8 +41,6 @@ import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSeria
import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent;
-import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt;
import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout;
@@ -155,18 +153,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
private AbstractComponent getComponent(
String id, TranslationContext.ExecutionGraphContext context) {
- AbstractComponent component = null;
- AdaptorBasicSpout spout = context.getSpout(id);
+ AbstractComponent spout = context.getSpout(id);
if (spout != null) {
- component = spout;
+ return spout;
} else {
- AdaptorBasicBolt bolt = context.getBolt(id);
- if (bolt != null) {
- component = bolt;
- }
+ return context.getBolt(id);
}
-
- return component;
}
private StormTopology getTopology(
@@ -176,7 +168,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
int parallelismNumber = options.getParallelismNumber();
- Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
+ Map<String, UnboundedSourceSpout> spouts = context.getSpouts();
for (String id : spouts.keySet()) {
IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 1230a31..28d102d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -34,9 +34,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
import org.apache.beam.runners.jstorm.translation.runtime.Executor;
import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
import org.apache.beam.runners.jstorm.translation.translator.Stream;
import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
import org.apache.beam.runners.jstorm.util.RunnerUtils;
@@ -333,7 +333,7 @@ public class TranslationContext {
*/
public static class ExecutionGraphContext {
- private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
+ private final Map<String, UnboundedSourceSpout> spoutMap = new HashMap<>();
private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
// One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
@@ -344,7 +344,7 @@ public class TranslationContext {
private int id = 1;
- public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
+ public void registerSpout(UnboundedSourceSpout spout, TaggedPValue output) {
checkNotNull(spout, "spout");
checkNotNull(output, "output");
String name = "spout" + genId();
@@ -354,14 +354,14 @@ public class TranslationContext {
Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
}
- public AdaptorBasicSpout getSpout(String id) {
+ public UnboundedSourceSpout getSpout(String id) {
if (Strings.isNullOrEmpty(id)) {
return null;
}
return this.spoutMap.get(id);
}
- public Map<String, AdaptorBasicSpout> getSpouts() {
+ public Map<String, UnboundedSourceSpout> getSpouts() {
return this.spoutMap;
}
@@ -418,7 +418,7 @@ public class TranslationContext {
public String toString() {
List<String> ret = new ArrayList<>();
ret.add("SPOUT");
- for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
+ for (Map.Entry<String, UnboundedSourceSpout> entry : spoutMap.entrySet()) {
ret.add(entry.getKey() + ": " + entry.getValue().toString());
}
ret.add("BOLT");
http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
deleted file mode 100644
index d8d4d46..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.topology.IRichBatchBolt;
-
-/**
- * Adaptor bolt of JStorm extends {@link AbstractComponent}.
- */
-public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
deleted file mode 100644
index 814d416..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * Adaptor bolt of JStorm extends {@link AbstractComponent}.
- */
-public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
index d33c17a..0366c13 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBatchBolt;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
/**
* ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
*/
-public class ExecutorsBolt extends AdaptorBasicBolt {
+public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
private static final long serialVersionUID = -7751043327801735211L;
private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
index 006cd47..690824d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.utils.KryoSerializer;
import java.io.IOException;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
* Spout implementation that wraps a Beam UnboundedSource.
* TODO: add wrapper to support metrics in UnboundedSource.
*/
-public class UnboundedSourceSpout extends AdaptorBasicSpout {
+public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
private final String description;