You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/02 10:50:29 UTC
flink git commit: [FLINK-2525] Add configuration support in
Storm-compatibility
Repository: flink
Updated Branches:
refs/heads/master 9f7110748 -> 9fe285a77
[FLINK-2525] Add configuration support in Storm-compatibility
This closes #1046
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fe285a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fe285a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fe285a7
Branch: refs/heads/master
Commit: 9fe285a77de5cd1a35ceb58f9295751fd3dd9e15
Parents: 9f71107
Author: ffbin <86...@qq.com>
Authored: Fri Oct 2 00:04:40 2015 +0200
Committer: mjsax <mj...@informatik.hu-berlin.de>
Committed: Fri Oct 2 10:48:48 2015 +0200
----------------------------------------------------------------------
docs/apis/storm_compatibility.md | 31 +++++
.../flink-storm-compatibility-core/README.md | 1 -
.../stormcompatibility/api/FlinkClient.java | 11 +-
.../api/FlinkLocalCluster.java | 26 ++--
.../stormcompatibility/util/StormConfig.java | 123 +++++++++++++++++++
.../wrappers/AbstractStormSpoutWrapper.java | 16 ++-
.../wrappers/StormBoltWrapper.java | 15 ++-
.../wrappers/FiniteStormSpoutWrapperTest.java | 25 ++--
.../wrappers/FiniteTestSpout.java | 3 +-
.../wrappers/StormBoltWrapperTest.java | 85 ++++++++++---
.../wrappers/StormFiniteSpoutWrapperTest.java | 28 ++---
.../wrappers/StormSpoutWrapperTest.java | 52 +++++++-
.../excamation/ExclamationTopology.java | 12 +-
.../excamation/ExclamationWithStormBolt.java | 23 ++--
.../excamation/ExclamationWithStormSpout.java | 14 ++-
.../excamation/StormExclamationLocal.java | 7 +-
.../stormoperators/ExclamationBolt.java | 23 +++-
.../util/FiniteStormFileSpout.java | 2 +
.../stormcompatibility/util/StormFileSpout.java | 12 +-
.../ExclamationWithStormBoltITCase.java | 4 +-
.../StormExclamationLocalITCase.java | 4 +-
.../flink/api/common/ExecutionConfig.java | 2 +-
22 files changed, 435 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index a6083f8..d676db8 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -169,10 +169,41 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
+## Configuring Spouts and Bolts
+
+In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
+This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
+If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
+
+For embedded usage, Flink's configuration mechanism must be used.
+A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
+Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
+However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
+Thus, Flink additionally provides `StormConfig` class that can be used like a raw `Map` to provide full compatibility to Storm.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+StormConfig config = new StormConfig();
+// set config values
+[...]
+
+// set global Storm configuration
+env.getConfig().setGlobalJobParameters(config);
+
+// assemble program with embedded Spouts and/or Bolts
+[...]
+~~~
+</div>
+</div>
+
## Multiple Output Streams
Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
+
For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index aef4847..f42dc24 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -3,7 +3,6 @@
The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
The following Strom features are not (yet/fully) supported by the compatibility layer right now:
-* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
* topology and tuple meta information (ie, `TopologyContext` not fully supported)
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 7078e90..4676102 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -32,6 +32,7 @@ import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import com.google.common.collect.Lists;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import scala.Some;
import scala.concurrent.Await;
@@ -66,7 +68,6 @@ import java.util.Map;
public class FlinkClient {
/** The client's configuration */
- @SuppressWarnings("unused")
private final Map<?,?> conf;
/** The jobmanager's host name */
private final String jobManagerHost;
@@ -161,7 +162,7 @@ public class FlinkClient {
*/
public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
topology)
- throws AlreadyAliveException, InvalidTopologyException {
+ throws AlreadyAliveException, InvalidTopologyException {
if (this.getTopologyJobId(name) != null) {
throw new AlreadyAliveException();
@@ -174,11 +175,15 @@ public class FlinkClient {
throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
}
+ /* set storm configuration */
+ if (this.conf != null) {
+ topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
+ }
+
final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
final Configuration configuration = jobGraph.getJobConfiguration();
-
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index c139201..9b3fb54 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +46,11 @@ public class FlinkLocalCluster {
/** The log used by this mini cluster */
private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
-
+
/** The flink mini cluster on which to execute the programs */
private final FlinkMiniCluster flink;
-
+
public FlinkLocalCluster() {
this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
this.flink.start();
@@ -59,17 +60,22 @@ public class FlinkLocalCluster {
this.flink = Objects.requireNonNull(flink);
}
- public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
+ @SuppressWarnings("rawtypes")
+ public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
throws Exception {
this.submitTopologyWithOpts(topologyName, conf, topology, null);
}
- public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
- final SubmitOptions submitOpts) throws Exception {
-
+ @SuppressWarnings("rawtypes")
+ public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
LOG.info("Running Storm topology on FlinkLocalCluster");
+
+ if(conf != null) {
+ topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
+ }
+
JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
- flink.submitJobDetached(jobGraph);
+ this.flink.submitJobDetached(jobGraph);
}
public void killTopology(final String topologyName) {
@@ -115,7 +121,7 @@ public class FlinkLocalCluster {
// ------------------------------------------------------------------------
// Access to default local cluster
// ------------------------------------------------------------------------
-
+
// A different {@link FlinkLocalCluster} to be used for execution of ITCases
private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
@@ -138,7 +144,7 @@ public class FlinkLocalCluster {
public static void initialize(LocalClusterFactory clusterFactory) {
currentFactory = Objects.requireNonNull(clusterFactory);
}
-
+
// ------------------------------------------------------------------------
// Cluster factory
// ------------------------------------------------------------------------
@@ -159,7 +165,7 @@ public class FlinkLocalCluster {
* A factory that instantiates a FlinkLocalCluster.
*/
public static class DefaultLocalClusterFactory implements LocalClusterFactory {
-
+
@Override
public FlinkLocalCluster createLocalCluster() {
return new FlinkLocalCluster();
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
new file mode 100644
index 0000000..6726ae8
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
+import backtype.storm.Config;
+
+/**
+ * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
+ * object) for embedded Spouts and Bolts.
+ */
+@SuppressWarnings("rawtypes")
+public final class StormConfig extends GlobalJobParameters implements Map {
+ private static final long serialVersionUID = 8019519109673698490L;
+
+ /** Contains the actual configuration that is provided to Spouts and Bolts. */
+ private final Map config = new HashMap();
+
+ /**
+ * Creates an empty configuration.
+ */
+ public StormConfig() {
+ }
+
+ /**
+ * Creates an configuration with initial values provided by the given {@code Map}.
+ *
+ * @param config
+ * Initial values for this configuration.
+ */
+ @SuppressWarnings("unchecked")
+ public StormConfig(Map config) {
+ this.config.putAll(config);
+ }
+
+
+ @Override
+ public int size() {
+ return this.config.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.config.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return this.config.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return this.config.containsValue(value);
+ }
+
+ @Override
+ public Object get(Object key) {
+ return this.config.get(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object put(Object key, Object value) {
+ return this.config.put(key, value);
+ }
+
+ @Override
+ public Object remove(Object key) {
+ return this.config.remove(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putAll(Map m) {
+ this.config.putAll(m);
+ }
+
+ @Override
+ public void clear() {
+ this.config.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Set<Object> keySet() {
+ return this.config.keySet();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Collection<Object> values() {
+ return this.config.values();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+ return this.config.entrySet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 62059fe..c531580 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -23,9 +23,11 @@ import java.util.HashMap;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -99,7 +101,19 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
- this.spout.open(null,
+
+ GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ StormConfig stormConfig = new StormConfig();
+
+ if (config != null) {
+ if (config instanceof StormConfig) {
+ stormConfig = (StormConfig) config;
+ } else {
+ stormConfig.putAll(config.toMap());
+ }
+ }
+
+ this.spout.open(stormConfig,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index c4ba9ba..6b58b0a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -24,11 +24,13 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -205,7 +207,18 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
this.numberOfAttributes, flinkCollector));
}
- this.bolt.prepare(null, topologyContext, stormCollector);
+ GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
+ StormConfig stormConfig = new StormConfig();
+
+ if (config != null) {
+ if (config instanceof StormConfig) {
+ stormConfig = (StormConfig) config;
+ } else {
+ stormConfig.putAll(config.toMap());
+ }
+ }
+
+ this.bolt.prepare(stormConfig, topologyContext, stormCollector);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
index 776e65d..381e130 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.stormcompatibility.wrappers;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Test;
@@ -37,14 +38,14 @@ public class FiniteStormSpoutWrapperTest {
@SuppressWarnings("unchecked")
@Test
public void runAndExecuteTest1() throws Exception {
-
- FiniteStormSpout stormSpout =
- mock(FiniteStormSpout.class);
+ final FiniteStormSpout stormSpout = mock(FiniteStormSpout.class);
when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
- FiniteStormSpoutWrapper<?> wrapper =
- new FiniteStormSpoutWrapper<Object>(stormSpout);
- wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+ final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
+ wrapper.setRuntimeContext(taskContext);
wrapper.run(mock(SourceContext.class));
verify(stormSpout, times(3)).nextTuple();
@@ -53,14 +54,14 @@ public class FiniteStormSpoutWrapperTest {
@SuppressWarnings("unchecked")
@Test
public void runAndExecuteTest2() throws Exception {
-
- FiniteStormSpout stormSpout =
- mock(FiniteStormSpout.class);
+ final FiniteStormSpout stormSpout = mock(FiniteStormSpout.class);
when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
- FiniteStormSpoutWrapper<?> wrapper =
- new FiniteStormSpoutWrapper<Object>(stormSpout);
- wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+ final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
+ wrapper.setRuntimeContext(taskContext);
wrapper.run(mock(SourceContext.class));
verify(stormSpout, never()).nextTuple();
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
index 96b5aea..eef35cf 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
@@ -38,8 +38,7 @@ class FiniteTestSpout implements IRichSpout {
@SuppressWarnings("rawtypes")
@Override
- public void open(final Map conf, final TopologyContext context,
- @SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
this.collector = collector;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index db34096..5cfb151 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -25,11 +25,13 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -44,7 +46,14 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashSet;
import java.util.Map;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
@@ -105,7 +114,7 @@ public class StormBoltWrapperTest extends AbstractTest {
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
}
- String[] schema;
+ final String[] schema;
if (numberOfAttributes == -1) {
schema = new String[1];
} else {
@@ -123,6 +132,7 @@ public class StormBoltWrapperTest extends AbstractTest {
}
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
final IRichBolt bolt = mock(IRichBolt.class);
@@ -132,7 +142,7 @@ public class StormBoltWrapperTest extends AbstractTest {
final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
wrapper.setup(mock(Output.class), taskContext);
- wrapper.open(new Configuration());
+ wrapper.open(null);
wrapper.processElement(record);
if (numberOfAttributes == -1) {
@@ -152,10 +162,12 @@ public class StormBoltWrapperTest extends AbstractTest {
when(record.getValue()).thenReturn(2).thenReturn(3);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- Output output = mock(Output.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- TestBolt bolt = new TestBolt();
- HashSet<String> raw = new HashSet<String>();
+ final Output output = mock(Output.class);
+
+ final TestBolt bolt = new TestBolt();
+ final HashSet<String> raw = new HashSet<String>();
if (rawOutType1) {
raw.add("stream1");
}
@@ -165,9 +177,9 @@ public class StormBoltWrapperTest extends AbstractTest {
final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null, raw);
wrapper.setup(output, taskContext);
- wrapper.open(new Configuration());
+ wrapper.open(null);
- SplitStreamType splitRecord = new SplitStreamType<Integer>();
+ final SplitStreamType splitRecord = new SplitStreamType<Integer>();
if (rawOutType1) {
splitRecord.streamId = "stream1";
splitRecord.value = 2;
@@ -192,30 +204,70 @@ public class StormBoltWrapperTest extends AbstractTest {
@SuppressWarnings("unchecked")
@Test
public void testOpen() throws Exception {
- final IRichBolt bolt = mock(IRichBolt.class);
+ final StormConfig stormConfig = new StormConfig();
+ final Configuration flinkConfig = new Configuration();
+
+ final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+ when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+ .thenReturn(flinkConfig);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
- wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
-
- wrapper.open(mock(Configuration.class));
+ wrapper.setup(mock(Output.class), taskContext);
+ // test without configuration
+ wrapper.open(null);
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+
+ // test with StormConfig
+ wrapper.open(null);
+ verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+ any(OutputCollector.class));
+
+ // test with Configuration
+ wrapper.open(null);
+ verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
+ any(OutputCollector.class));
}
@SuppressWarnings("unchecked")
@Test
public void testOpenSink() throws Exception {
+ final StormConfig stormConfig = new StormConfig();
+ final Configuration flinkConfig = new Configuration();
+
+ final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+ when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+ .thenReturn(flinkConfig);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
- wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
+ wrapper.setup(mock(Output.class), taskContext);
- wrapper.open(mock(Configuration.class));
+ // test without configuration
+ wrapper.open(null);
+ verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
+ isNull(OutputCollector.class));
- verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
+ // test with StormConfig
+ wrapper.open(null);
+ verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+ isNull(OutputCollector.class));
+
+ // test with Configuration
+ wrapper.open(null);
+ verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
+ isNull(OutputCollector.class));
}
@SuppressWarnings("unchecked")
@@ -230,12 +282,11 @@ public class StormBoltWrapperTest extends AbstractTest {
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- // when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
wrapper.setup(mock(Output.class), taskContext);
wrapper.close();
wrapper.dispose();
-
+
verify(bolt).cleanup();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index c890ab1..a4eea7e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -36,6 +37,7 @@ import java.util.LinkedList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
@@ -48,10 +50,13 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
final IRichSpout spout = mock(IRichSpout.class);
final int numberOfCalls = this.r.nextInt(50);
final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
- spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+ spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.run(mock(SourceContext.class));
verify(spout, times(numberOfCalls)).nextTuple();
@@ -66,10 +71,13 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
expectedResult.add(new Tuple1<Integer>(new Integer(i)));
}
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
spout);
- spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+ spoutWrapper.setRuntimeContext(taskContext);
final TestContext collector = new TestContext();
spoutWrapper.run(collector);
@@ -84,10 +92,13 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
+ StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
spout);
- spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+ spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.cancel();
final TestContext collector = new TestContext();
@@ -96,15 +107,4 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
Assert.assertEquals(expectedResult, collector.result);
}
- @Test
- public void testClose() throws Exception {
- final IRichSpout spout = mock(IRichSpout.class);
- final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
- spout);
-
- spoutWrapper.close();
-
- verify(spout).close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 6d2f196..04dc48d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -17,9 +17,16 @@
package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.StormConfig;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
@@ -28,21 +35,64 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.LinkedList;
+import java.util.Map;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormSpoutWrapperTest extends AbstractTest {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testRunPrepare() throws Exception {
+ final StormConfig stormConfig = new StormConfig();
+ final Configuration flinkConfig = new Configuration();
+
+ final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+ when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+ .thenReturn(flinkConfig);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+
+ final IRichSpout spout = mock(IRichSpout.class);
+ final StormSpoutWrapper spoutWrapper = new StormSpoutWrapper(spout);
+ spoutWrapper.setRuntimeContext(taskContext);
+ spoutWrapper.isRunning = false;
+
+ // test without configuration
+ spoutWrapper.run(mock(SourceContext.class));
+ verify(spout).open(any(Map.class), any(TopologyContext.class),
+ any(SpoutOutputCollector.class));
+
+ // test with StormConfig
+ spoutWrapper.run(mock(SourceContext.class));
+ verify(spout).open(same(stormConfig), any(TopologyContext.class),
+ any(SpoutOutputCollector.class));
+
+ // test with Configuration
+ spoutWrapper.run(mock(SourceContext.class));
+ verify(spout, times(3)).open(eq(flinkConfig.toMap()), any(TopologyContext.class),
+ any(SpoutOutputCollector.class));
+ }
+
@Test
public void testRunExecuteCancelInfinite() throws Exception {
final int numberOfCalls = 5 + this.r.nextInt(5);
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
- spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+ spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.cancel();
final TestContext collector = new TestContext();
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
index b7c98a8..d8d620b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -93,19 +93,25 @@ public class ExclamationTopology {
private static boolean fileInputOutput = false;
private static String textPath;
private static String outputPath;
+ private static int exclamationNum = 3;
+
+ static int getExclamation() {
+ return exclamationNum;
+ }
static boolean parseParameters(final String[] args) {
if (args.length > 0) {
// parse input arguments
fileInputOutput = true;
- if (args.length == 2) {
+ if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
} else {
System.err.println(
"Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text " +
- "path> <result path>");
+ "path> <result path> <number of exclamation marks>");
return false;
}
} else {
@@ -113,7 +119,7 @@ public class ExclamationTopology {
System.out.println(" Provide parameters to read input data from a file");
System.out.println(
" Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>" +
- " <result path>");
+ " <result path> <number of exclamation marks>");
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
index ee5d9f9..c8af3a6 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,7 +37,7 @@ import backtype.storm.utils.Utils;
* The input is a plain text file with lines separated by newline characters.
* <p/>
* <p/>
- * Usage: <code>StormExclamationWithStormBolt <text path> <result path></code><br/>
+ * Usage: <code>StormExclamationWithStormBolt <text path> <result path> <number of exclamation marks></code><br/>
* If no parameters are provided, the program is run with default data from
* {@link WordCountData}.
* <p/>
@@ -61,15 +62,20 @@ public class ExclamationWithStormBolt {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // set Storm configuration
+ StormConfig config = new StormConfig();
+ config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum));
+ env.getConfig().setGlobalJobParameters(config);
+
// get input data
final DataStream<String> text = getTextDataStream(env);
final DataStream<String> exclaimed = text
.transform("StormBoltTokenizer",
TypeExtractor.getForObject(""),
- new StormBoltWrapper<String, String>(new ExclamationBolt(),
- new String[] { Utils.DEFAULT_STREAM_ID }))
- .map(new ExclamationMap());
+ new StormBoltWrapper<String, String>(new ExclamationBolt(),
+ new String[] { Utils.DEFAULT_STREAM_ID }))
+ .map(new ExclamationMap());
// emit result
if (fileOutput) {
@@ -87,6 +93,7 @@ public class ExclamationWithStormBolt {
// *************************************************************************
private static class ExclamationMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 4614754344067170619L;
@Override
public String map(String value) throws Exception {
@@ -101,23 +108,25 @@ public class ExclamationWithStormBolt {
private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;
+ private static int exclamationNum = 3;
private static boolean parseParameters(final String[] args) {
if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if (args.length == 2) {
+ if (args.length == 3) {
textPath = args[0];
outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
} else {
- System.err.println("Usage: ExclamationWithStormBolt <text path> <result path>");
+ System.err.println("Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
return false;
}
} else {
System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: ExclamationWithStormBolt <text path> <result path>");
+ System.out.println(" Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
}
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
index 962a318..99c816d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -125,11 +126,16 @@ public class ExclamationWithStormSpout {
private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
if (fileOutput) {
- // read the text file from given input path
final String[] tokens = textPath.split(":");
- final String localFile = tokens[tokens.length - 1];
+ final String inputFile = tokens[tokens.length - 1];
+
+ // set Storm configuration
+ StormConfig config = new StormConfig();
+ config.put(FiniteStormFileSpout.INPUT_FILE_PATH, inputFile);
+ env.getConfig().setGlobalJobParameters(config);
+
return env.addSource(
- new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile),
+ new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(),
new String[] { Utils.DEFAULT_STREAM_ID }),
TypeExtractor.getForClass(String.class)).setParallelism(1);
}
@@ -137,7 +143,7 @@ public class ExclamationWithStormSpout {
return env.addSource(
new FiniteStormSpoutWrapper<String>(new FiniteStormInMemorySpout(
WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
- TypeExtractor.getForClass(String.class)).setParallelism(1);
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
index bd1220c..c87b3a5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -17,9 +17,12 @@
package org.apache.flink.stormcompatibility.excamation;
+import backtype.storm.Config;
import backtype.storm.utils.Utils;
+
import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
/**
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
@@ -61,8 +64,10 @@ public class StormExclamationLocal {
final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
// execute program locally
+ Config conf = new Config();
+ conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
- cluster.submitTopology(topologyId, null, builder.createTopology());
+ cluster.submitTopology(topologyId, conf, builder.createTopology());
Utils.sleep(10 * 1000);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
index 14232b7..2709eff 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
@@ -29,12 +29,29 @@ import backtype.storm.tuple.Values;
import java.util.Map;
public class ExclamationBolt implements IRichBolt {
- OutputCollector _collector;
+ private final static long serialVersionUID = -6364882114201311380L;
+
+ public final static String EXCLAMATION_COUNT = "exclamation.count";
+
+ private OutputCollector collector;
+ private String exclamation;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
+ this.collector = collector;
+
+ Object count = conf.get(EXCLAMATION_COUNT);
+ if (count != null) {
+ int exclamationNum = (Integer) count;
+ StringBuilder builder = new StringBuilder();
+ for (int index = 0; index < exclamationNum; ++index) {
+ builder.append('!');
+ }
+ this.exclamation = builder.toString();
+ } else {
+ this.exclamation = "!";
+ }
}
@Override
@@ -43,7 +60,7 @@ public class ExclamationBolt implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
index dddbb4b..64b3e28 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -35,6 +35,8 @@ public class FiniteStormFileSpout extends StormFileSpout implements FiniteStormS
private String line;
private boolean newLineRead;
+ public FiniteStormFileSpout() {}
+
public FiniteStormFileSpout(String path) {
super(path);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
index 7d89c75..0611e37 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -33,9 +33,13 @@ import java.util.Map;
public class StormFileSpout extends AbstractStormSpout {
private static final long serialVersionUID = -6996907090003590436L;
- protected final String path;
+ public final static String INPUT_FILE_PATH = "input.path";
+
+ protected String path = null;
protected BufferedReader reader;
+ public StormFileSpout() {}
+
public StormFileSpout(final String path) {
this.path = path;
}
@@ -44,6 +48,12 @@ public class StormFileSpout extends AbstractStormSpout {
@Override
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
super.open(conf, context, collector);
+
+ Object configuredPath = conf.get(INPUT_FILE_PATH);
+ if(configuredPath != null) {
+ this.path = (String)configuredPath;
+ }
+
try {
this.reader = new BufferedReader(new FileReader(this.path));
} catch (final FileNotFoundException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
index f47a58f..a858f36 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
@@ -27,11 +27,13 @@ public class ExclamationWithStormBoltITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
+ protected String exclamationNum;
@Override
protected void preSubmit() throws Exception {
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
+ this.exclamationNum = "3";
}
@Override
@@ -41,7 +43,7 @@ public class ExclamationWithStormBoltITCase extends StormTestBase {
@Override
protected void testProgram() throws Exception {
- ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath});
+ ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
index 6cba39a..a19f3af 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -27,11 +27,13 @@ public class StormExclamationLocalITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
+ protected String exclamationNum;
@Override
protected void preSubmit() throws Exception {
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
+ this.exclamationNum = "3";
}
@Override
@@ -41,6 +43,6 @@ public class StormExclamationLocalITCase extends StormTestBase {
@Override
protected void testProgram() throws Exception {
- StormExclamationLocal.main(new String[]{this.textPath, this.resultPath});
+ StormExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fe285a7/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 28f3b92..df0248a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -647,7 +647,7 @@ public class ExecutionConfig implements Serializable {
* Abstract class for a custom user configuration object registered at the execution config.
*
* This user config is accessible at runtime through
- * getRuntimeContext().getExecutionConfig().getUserConfig()
+ * getRuntimeContext().getExecutionConfig().GlobalJobParameters()
*/
public static class GlobalJobParameters implements Serializable {
private static final long serialVersionUID = 1L;