You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:40 UTC
[15/15] flink git commit: [FLINK-2566] FlinkTopologyContext not
populated completely - extended FlinkTopologyContext to be populted with all
supportable attributes - added JUnit test - updated README.md additionally:
module restructuring to get cle
[FLINK-2566] FlinkTopologyContext not populated completely
- extended FlinkTopologyContext to be populted with all supportable attributes
- added JUnit test
- updated README.md
additionally: module restructuring to get cleaner package structure
This closes #1135
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a67a60f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a67a60f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a67a60f
Branch: refs/heads/master
Commit: 7a67a60f934123863ca96a95e30471c99bb8088a
Parents: 39115ab
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Tue Sep 15 23:59:31 2015 +0200
Committer: mjsax <mj...@informatik.hu-berlin.de>
Committed: Tue Oct 6 13:29:32 2015 +0200
----------------------------------------------------------------------
.../flink-storm-compatibility-core/README.md | 2 +-
.../api/FlinkOutputFieldsDeclarer.java | 170 ----------------
.../stormcompatibility/api/FlinkTopology.java | 16 +-
.../api/FlinkTopologyBuilder.java | 39 ++--
.../api/FlinkTopologyContext.java | 161 ---------------
.../util/FiniteStormSpout.java | 39 ++++
.../util/FlinkOutputFieldsDeclarer.java | 168 ++++++++++++++++
.../util/FlinkStormStreamSelector.java | 2 +-
.../util/FlinkTopologyContext.java | 164 ++++++++++++++++
.../util/SplitStreamTypeKeySelector.java | 46 +++++
.../wrappers/AbstractStormSpoutWrapper.java | 36 ++--
.../wrappers/FiniteStormSpout.java | 37 ----
.../wrappers/FiniteStormSpoutWrapper.java | 1 +
.../wrappers/SetupOutputFieldsDeclarer.java | 63 ++++++
.../wrappers/StormBoltWrapper.java | 17 +-
.../wrappers/StormOutputFieldsDeclarer.java | 63 ------
.../wrappers/StormWrapperSetupHelper.java | 192 ++++++++++++++++--
.../api/FlinkOutputFieldsDeclarerTest.java | 193 ------------------
.../api/FlinkTopologyContextTest.java | 74 -------
.../api/FlinkTopologyTest.java | 10 +-
.../api/TestTopologyBuilder.java | 27 +++
.../util/FiniteTestSpout.java | 77 ++++++++
.../util/FlinkOutputFieldsDeclarerTest.java | 193 ++++++++++++++++++
.../util/FlinkStormStreamSelectorTest.java | 51 +++++
.../util/FlinkTopologyContextTest.java | 114 +++++++++++
.../stormcompatibility/util/TestDummyBolt.java | 20 +-
.../stormcompatibility/util/TestDummySpout.java | 17 +-
.../flink/stormcompatibility/util/TestSink.java | 16 +-
.../wrappers/FiniteStormSpoutWrapperTest.java | 6 +
.../wrappers/FiniteTestSpout.java | 77 --------
.../wrappers/FlinkStormStreamSelectorTest.java | 51 -----
.../wrappers/SetupOutputFieldsDeclarerTest.java | 91 +++++++++
.../wrappers/StormBoltWrapperTest.java | 36 ++--
.../wrappers/StormFiniteSpoutWrapperTest.java | 13 +-
.../wrappers/StormOutputFieldsDeclarerTest.java | 91 ---------
.../wrappers/StormSpoutWrapperTest.java | 6 +
.../wrappers/StormWrapperSetupHelperTest.java | 194 ++++++++++++++++++-
.../util/FiniteStormFileSpout.java | 2 +-
.../util/FiniteStormInMemorySpout.java | 2 +-
.../split/SplitBoltTopology.java | 4 +-
40 files changed, 1565 insertions(+), 1016 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index f42dc24..9663fc7 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -3,7 +3,7 @@
The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
The following Strom features are not (yet/fully) supported by the compatibility layer right now:
-* topology and tuple meta information (ie, `TopologyContext` not fully supported)
+* tuple meta information
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
* direct emit connection pattern
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index e2f6332..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt
- * bolt}.<br />
- * <br />
- * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore,
- * direct emit is not supported.</strong>
- */
-final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
- /** the declared output streams and schemas */
- final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
-
- @Override
- public void declare(final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
- *
- * @throws UnsupportedOperationException
- * if {@code direct} is {@code true}
- */
- @Override
- public void declare(final boolean direct, final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
- }
-
- @Override
- public void declareStream(final String streamId, final Fields fields) {
- this.declareStream(streamId, false, fields);
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
- *
- * @throws UnsupportedOperationException
- * if {@code direct} is {@code true}
- */
- @Override
- public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if (direct) {
- throw new UnsupportedOperationException("Direct emit is not supported by Flink");
- }
-
- this.outputStreams.put(streamId, fields);
- }
-
- /**
- * Returns {@link TypeInformation} for the declared output schema for a specific stream.
- *
- * @param streamId
- * A stream ID.
- *
- * @return output type information for the declared output schema of the specified stream; or {@code null} if
- * {@code streamId == null}
- *
- * @throws IllegalArgumentException
- * If no output schema was declared for the specified stream or if more then 25 attributes got declared.
- */
- public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
- if (streamId == null) {
- return null;
- }
-
- Fields outputSchema = this.outputStreams.get(streamId);
- if (outputSchema == null) {
- throw new IllegalArgumentException("Stream with ID '" + streamId
- + "' was not declared.");
- }
-
- Tuple t;
- final int numberOfAttributes = outputSchema.size();
-
- if (numberOfAttributes == 1) {
- return TypeExtractor.getForClass(Object.class);
- } else if (numberOfAttributes <= 25) {
- try {
- t = Tuple.getTupleClass(numberOfAttributes).newInstance();
- } catch (final InstantiationException e) {
- throw new RuntimeException(e);
- } catch (final IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- } else {
- throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
- }
-
- // TODO: declare only key fields as DefaultComparable
- for (int i = 0; i < numberOfAttributes; ++i) {
- t.setField(new DefaultComparable(), i);
- }
-
- return TypeExtractor.getForObject(t);
- }
-
- /**
- * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
- * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
- * Flink cannot use them and will throw an exception.
- */
- private static class DefaultComparable implements Comparable<DefaultComparable> {
-
- public DefaultComparable() {
- }
-
- @Override
- public int compareTo(final DefaultComparable o) {
- return 0;
- }
- }
-
- /**
- * Computes the indexes within the declared output schema of the specified stream, for a list of given
- * field-grouping attributes.
- *
- * @param streamId
- * A stream ID.
- * @param groupingFields
- * The names of the key fields.
- *
- * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
- * list
- */
- public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
- final int[] fieldIndexes = new int[groupingFields.size()];
-
- for (int i = 0; i < fieldIndexes.length; ++i) {
- fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
- }
-
- return fieldIndexes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
index 8c75a2c..179466e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -28,17 +28,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
* {@link FlinkClient}.
*/
-class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology extends StreamExecutionEnvironment {
- /** The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology} */
- private final StormTopology stormTopology;
/** The number of declared tasks for the whole program (ie, sum over all dops) */
private int numberOfTasks = 0;
- public FlinkTopology(final StormTopology stormTopology) {
+ public FlinkTopology() {
// Set default parallelism to 1, to mirror Storm default behavior
super.setParallelism(1);
- this.stormTopology = stormTopology;
}
/**
@@ -52,7 +49,7 @@ class FlinkTopology extends StreamExecutionEnvironment {
public JobExecutionResult execute() throws Exception {
throw new UnsupportedOperationException(
"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
+ "instead.");
}
/**
@@ -66,12 +63,7 @@ class FlinkTopology extends StreamExecutionEnvironment {
public JobExecutionResult execute(final String jobName) throws Exception {
throw new UnsupportedOperationException(
"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
- }
-
- //TODO
- public String getStormTopologyAsString() {
- return this.stormTopology.toString();
+ "instead.");
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e4f6c94..d62d56b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -16,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.stormcompatibility.api;
import backtype.storm.generated.ComponentCommon;
@@ -35,10 +34,12 @@ import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.util.SplitStreamTypeKeySelector;
import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
@@ -73,14 +74,18 @@ public class FlinkTopologyBuilder {
private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
/** All spouts&bolts declarers by their ID */
private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
+ // needs to be a class member for internal testing purpose
+ private StormTopology stormTopology;
+
/**
* Creates a Flink program that uses the specified spouts and bolts.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public FlinkTopology createTopology() {
- final StormTopology stormTopology = this.stormBuilder.createTopology();
- final FlinkTopology env = new FlinkTopology(stormTopology);
+ this.stormTopology = this.stormBuilder.createTopology();
+
+ final FlinkTopology env = new FlinkTopology();
env.setParallelism(1);
final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
@@ -102,6 +107,7 @@ public class FlinkTopologyBuilder {
} else {
spoutWrapper = new StormSpoutWrapper(userSpout);
}
+ spoutWrapper.setStormTopology(stormTopology);
DataStreamSource source;
HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
@@ -126,6 +132,8 @@ public class FlinkTopologyBuilder {
if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
source.setParallelism(dop);
+ } else {
+ common.set_parallelism_hint(1);
}
env.increaseNumberOfTasks(dop);
}
@@ -217,6 +225,7 @@ public class FlinkTopologyBuilder {
}
SingleOutputStreamOperator outputStream;
+ StormBoltWrapper boltWrapper;
if (boltOutputStreams.size() < 2) { // single output stream or sink
String outputStreamId = null;
if (boltOutputStreams.size() == 1) {
@@ -225,11 +234,9 @@ public class FlinkTopologyBuilder {
final TypeInformation<?> outType = declarer
.getOutputType(outputStreamId);
- outputStream = inputStream.transform(
- boltId,
- outType,
- new StormBoltWrapper(userBolt, this.outputStreams.get(
- producerId).get(inputStreamId)));
+ boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams
+ .get(producerId).get(inputStreamId));
+ outputStream = inputStream.transform(boltId, outType, boltWrapper);
if (outType != null) {
// only for non-sink nodes
@@ -241,11 +248,8 @@ public class FlinkTopologyBuilder {
final TypeInformation<?> outType = TypeExtractor
.getForClass(SplitStreamType.class);
- outputStream = inputStream.transform(
- boltId,
- outType,
- new StormBoltWrapper(userBolt, this.outputStreams.get(
- producerId).get(inputStreamId)));
+ boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
+ outputStream = inputStream.transform(boltId, outType, boltWrapper);
SplitStream splitStreams = outputStream
.split(new FlinkStormStreamSelector());
@@ -256,11 +260,14 @@ public class FlinkTopologyBuilder {
}
availableInputs.put(boltId, op);
}
+ boltWrapper.setStormTopology(stormTopology);
int dop = 1;
if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
outputStream.setParallelism(dop);
+ } else {
+ common.set_parallelism_hint(1);
}
env.increaseNumberOfTasks(dop);
@@ -393,4 +400,8 @@ public class FlinkTopologyBuilder {
* }
*/
+ // for internal testing purpose only
+ StormTopology getStormTopology() {
+ return this.stormTopology;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
deleted file mode 100644
index a761617..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
- * a Storm topology is executed within Flink.
- */
-public class FlinkTopologyContext extends TopologyContext {
-
- /**
- * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
- * for each parallel task
- *
- * @param topology
- * The Storm topology that is currently executed
- * @param taskToComponents
- * A map from task IDs to Component IDs
- * @param taskId
- * The ID of the task the context belongs to.
- */
- public FlinkTopologyContext(final StormTopology topology, final Map<Integer, String> taskToComponents,
- final Integer taskId) {
- super(topology, null, taskToComponents, null, null, null, null, null, taskId, null, null, null, null, null,
- null, null);
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public void addTaskHook(final ITaskHook hook) {
- throw new UnsupportedOperationException("Task hooks are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public Collection<ITaskHook> getHooks() {
- throw new UnsupportedOperationException("Task hooks are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public IMetric getRegisteredMetricByName(final String name) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
-
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @SuppressWarnings("rawtypes")
- @Override
- public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @SuppressWarnings("rawtypes")
- @Override
- public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @SuppressWarnings("unchecked")
- @Override
- public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink");
-
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
- obj) {
- throw new UnsupportedOperationException("Not supported by Flink");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
new file mode 100644
index 0000000..114fa7c
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a Storm spout that emits a finite number of records. Common Storm
+ * spouts emit infinite streams by default. To change this behaviour and take advantage of
+ * Flink's finite-source capabilities, the spout should implement this interface. To wrap
+ * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
+ */
+public interface FiniteStormSpout extends IRichSpout {
+
+ /**
+ * When returns true, the spout has reached the end of the stream.
+ *
+ * @return true, if the spout's stream reached its end, false otherwise
+ */
+ public boolean reachedEnd();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
new file mode 100644
index 0000000..3eee8d6
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
+ * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Flink does not support direct emit.</strong>
+ */
+public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+ /** The declared output streams and schemas. */
+ public final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
+
+ @Override
+ public void declare(final Fields fields) {
+ this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+ *
+ * @throws UnsupportedOperationException
+ * if {@code direct} is {@code true}
+ */
+ @Override
+ public void declare(final boolean direct, final Fields fields) {
+ this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+ }
+
+ @Override
+ public void declareStream(final String streamId, final Fields fields) {
+ this.declareStream(streamId, false, fields);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+ *
+ * @throws UnsupportedOperationException
+ * if {@code direct} is {@code true}
+ */
+ @Override
+ public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+ if (direct) {
+ throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+ }
+
+ this.outputStreams.put(streamId, fields);
+ }
+
+ /**
+ * Returns {@link TypeInformation} for the declared output schema for a specific stream.
+ *
+ * @param streamId
+ * A stream ID.
+ *
+ * @return output type information for the declared output schema of the specified stream; or {@code null} if
+ * {@code streamId == null}
+ *
+ * @throws IllegalArgumentException
+ * If no output schema was declared for the specified stream or if more then 25 attributes got declared.
+ */
+ public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+ if (streamId == null) {
+ return null;
+ }
+
+ Fields outputSchema = this.outputStreams.get(streamId);
+ if (outputSchema == null) {
+ throw new IllegalArgumentException("Stream with ID '" + streamId
+ + "' was not declared.");
+ }
+
+ Tuple t;
+ final int numberOfAttributes = outputSchema.size();
+
+ if (numberOfAttributes == 1) {
+ return TypeExtractor.getForClass(Object.class);
+ } else if (numberOfAttributes <= 25) {
+ try {
+ t = Tuple.getTupleClass(numberOfAttributes).newInstance();
+ } catch (final InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (final IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
+ }
+
+ // TODO: declare only key fields as DefaultComparable
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ t.setField(new DefaultComparable(), i);
+ }
+
+ return TypeExtractor.getForObject(t);
+ }
+
+ /**
+ * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
+ * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
+ * Flink cannot use them and will throw an exception.
+ */
+ private static class DefaultComparable implements Comparable<DefaultComparable> {
+
+ public DefaultComparable() {
+ }
+
+ @Override
+ public int compareTo(final DefaultComparable o) {
+ return 0;
+ }
+ }
+
+ /**
+ * Computes the indexes within the declared output schema of the specified stream, for a list of given
+ * field-grouping attributes.
+ *
+ * @param streamId
+ * A stream ID.
+ * @param groupingFields
+ * The names of the key fields.
+ *
+ * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
+ * list
+ */
+ public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
+ final int[] fieldIndexes = new int[groupingFields.size()];
+
+ for (int i = 0; i < fieldIndexes.length; ++i) {
+ fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
+ }
+
+ return fieldIndexes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
index 7ca45d6..7e60a87 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
/**
* Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
*/
-final public class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
+public final class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
private static final long serialVersionUID = 2553423379715401023L;
/** internal cache to avoid short living ArrayList objects. */
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
new file mode 100644
index 0000000..14af830
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.hooks.ITaskHook;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.state.ISubscribedState;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import clojure.lang.Atom;
+
+/**
+ * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
+ * a Storm topology is executed within Flink.
+ */
+public final class FlinkTopologyContext extends TopologyContext {
+
+ /**
+ * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
+ * for each parallel task
+ */
+ public FlinkTopologyContext(final StormTopology topology, @SuppressWarnings("rawtypes") final Map stormConf,
+ final Map<Integer, String> taskToComponent, final Map<String, List<Integer>> componentToSortedTasks,
+ final Map<String, Map<String, Fields>> componentToStreamToFields, final String stormId, final String codeDir,
+ final String pidDir, final Integer taskId, final Integer workerPort, final List<Integer> workerTasks,
+ final Map<String, Object> defaultResources, final Map<String, Object> userResources,
+ final Map<String, Object> executorData, @SuppressWarnings("rawtypes") final Map registeredMetrics,
+ final Atom openOrPrepareWasCalled) {
+ super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId,
+ codeDir, pidDir, taskId, workerPort, workerTasks, defaultResources, userResources, executorData,
+ registeredMetrics, openOrPrepareWasCalled);
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public void addTaskHook(final ITaskHook hook) {
+ throw new UnsupportedOperationException("Task hooks are not supported by Flink");
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public Collection<ITaskHook> getHooks() {
+ throw new UnsupportedOperationException("Task hooks are not supported by Flink");
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public IMetric getRegisteredMetricByName(final String name) {
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
+
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
+ throw new UnsupportedOperationException("Not supported by Flink");
+
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
+ throw new UnsupportedOperationException("Not supported by Flink");
+ }
+
+ /**
+ * Not supported by Flink.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
+ obj) {
+ throw new UnsupportedOperationException("Not supported by Flink");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
new file mode 100644
index 0000000..200f772
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
+
+/**
+ * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
+ * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
+ *
+ * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
+ * {@link ArrayKeySelector} on it.
+ */
+public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
+ private static final long serialVersionUID = 4672434660037669254L;
+
+ private final ArrayKeySelector<Tuple> selector;
+
+ public SplitStreamTypeKeySelector(int... fields) {
+ this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+ }
+
+ @Override
+ public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
+ return selector.getKey(value.value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index c531580..ccd29bb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -20,6 +20,7 @@ package org.apache.flink.stormcompatibility.wrappers;
import java.util.Collection;
import java.util.HashMap;
+import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;
@@ -43,22 +44,16 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 4993283609095408765L;
- /**
- * Number of attributes of the bolt's output tuples per stream.
- */
+ /** Number of attributes of the bolt's output tuples per stream. */
private final HashMap<String, Integer> numberOfAttributes;
- /**
- * The wrapped Storm {@link IRichSpout spout}.
- */
+ /** The wrapped Storm {@link IRichSpout spout}. */
protected final IRichSpout spout;
- /**
- * The wrapper of the given Flink collector.
- */
+ /** The wrapper of the given Flink collector. */
protected StormSpoutCollector<OUT> collector;
- /**
- * Indicates, if the source is still running or was canceled.
- */
+ /** Indicates, if the source is still running or was canceled. */
protected volatile boolean isRunning = true;
+ /** The original Storm topology. */
+ protected StormTopology stormTopology;
/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
@@ -98,6 +93,16 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
}
+ /**
+ * Sets the original Storm topology.
+ *
+ * @param stormTopology
+ * The original Storm topology.
+ */
+ public void setStormTopology(StormTopology stormTopology) {
+ this.stormTopology = stormTopology;
+ }
+
@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
@@ -114,8 +119,11 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
}
this.spout.open(stormConfig,
- StormWrapperSetupHelper
- .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
+ StormWrapperSetupHelper.createTopologyContext(
+ (StreamingRuntimeContext) super.getRuntimeContext(),
+ this.spout,
+ this.stormTopology,
+ null),
new SpoutOutputCollector(this.collector));
this.spout.activate();
this.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
deleted file mode 100644
index 58a4f7a..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * This interface represents a Storm spout that emits a finite number of records. Common Storm
- * spouts emit infinite streams by default. To change this behaviour and take advantage of
- * Flink's finite-source capabilities, the spout should implement this interface. To wrap
- * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
- */
-public interface FiniteStormSpout extends IRichSpout {
-
- /**
- * When returns true, the spout has reached the end of the stream.
- *
- * @return true, if the spout's stream reached its end, false otherwise
- */
- public boolean reachedEnd();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
index 1912afc..f499ecc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
import com.google.common.collect.Sets;
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
new file mode 100644
index 0000000..3cd27d4
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import java.util.HashMap;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * {@link SetupOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and
+ * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
+ * method.
+ */
+class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+ /** The number of attributes for each declared stream by the wrapped operator. */
+ HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
+
+ @Override
+ public void declare(final Fields fields) {
+ this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+ }
+
+ @Override
+ public void declare(final boolean direct, final Fields fields) {
+ this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+ }
+
+ @Override
+ public void declareStream(final String streamId, final Fields fields) {
+ this.declareStream(streamId, false, fields);
+ }
+
+ @Override
+ public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+ if (streamId == null) {
+ throw new IllegalArgumentException("Stream ID cannot be null.");
+ }
+ if (direct) {
+ throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+ }
+
+ this.outputSchemas.put(streamId, fields.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 6b58b0a..715d6df 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -19,6 +19,7 @@ package org.apache.flink.stormcompatibility.wrappers;
import java.util.Collection;
import java.util.HashMap;
+import backtype.storm.generated.StormTopology;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
@@ -59,6 +60,8 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
private final HashMap<String, Integer> numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
private final Fields inputSchema;
+ /** The original Storm topology. */
+ protected StormTopology stormTopology;
/**
* We have to use this because Operators must output
@@ -193,12 +196,22 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
}
+ /**
+ * Sets the original Storm topology.
+ *
+ * @param stormTopology
+ * The original Storm topology.
+ */
+ public void setStormTopology(StormTopology stormTopology) {
+ this.stormTopology = stormTopology;
+ }
+
@Override
public void open(final Configuration parameters) throws Exception {
super.open(parameters);
- final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
- super.runtimeContext, false);
+ final TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
+ super.runtimeContext, this.bolt, this.stormTopology, null);
flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
deleted file mode 100644
index f33d4d3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and
- * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
- * method.
- */
-class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
- /** The number of attributes for each declared stream by the wrapped operator. */
- HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
-
- @Override
- public void declare(final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
- }
-
- @Override
- public void declare(final boolean direct, final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
- }
-
- @Override
- public void declareStream(final String streamId, final Fields fields) {
- this.declareStream(streamId, false, fields);
- }
-
- @Override
- public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if (streamId == null) {
- throw new IllegalArgumentException("Stream ID cannot be null.");
- }
- if (direct) {
- throw new UnsupportedOperationException("Direct emit is not supported by Flink");
- }
-
- this.outputSchemas.put(streamId, fields.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index 75ab8e0..891497e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -14,32 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.StreamInfo;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
+import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import clojure.lang.Atom;
+
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
- * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or
+ * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} and
* {@link StormBoltWrapper}.
*/
class StormWrapperSetupHelper {
+ /** The configuration key for the topology name. */
+ final static String TOPOLOGY_NAME = "storm.topology.name";
+
/**
* Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}
* per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for
@@ -60,7 +71,7 @@ class StormWrapperSetupHelper {
public static HashMap<String, Integer> getNumberOfAttributes(final IComponent spoutOrBolt,
final Collection<String> rawOutputs)
throws IllegalArgumentException {
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
spoutOrBolt.declareOutputFields(declarer);
for (Entry<String, Integer> schema : declarer.outputSchemas.entrySet()) {
@@ -84,27 +95,174 @@ class StormWrapperSetupHelper {
return declarer.outputSchemas;
}
- // TODO
- public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context,
- final boolean spoutOrBolt) {
- final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask());
+ /** Used to computed unique task IDs for a Storm topology. */
+ private static int tid;
+
+ /**
+ * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, Flink task / Storm executor).
+ *
+ * @param context
+ * The Flink runtime context.
+ * @param spoutOrBolt
+ * The Spout or Bolt this context is created for.
+ * @param stormTopology
+ * The original Storm topology.
+ * @param stormConfig
+ * The user provided configuration.
+ * @return The created {@link TopologyContext}.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static synchronized TopologyContext createTopologyContext(
+ final StreamingRuntimeContext context, final IComponent spoutOrBolt,
+ StormTopology stormTopology, Map stormConfig) {
+ String operatorName = context.getTaskName();
+ if (operatorName.startsWith("Source: ")) {
+ // prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here
+ operatorName = operatorName.substring(8);
+ }
+ final int dop = context.getNumberOfParallelSubtasks();
final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
- taskToComponents.put(taskId, context.getTaskName());
+ final Map<String, List<Integer>> componentToSortedTasks = new HashMap<String, List<Integer>>();
+ final Map<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
+ String stormId = (String) stormConfig.get(TOPOLOGY_NAME);
+ String codeDir = null; // not supported
+ String pidDir = null; // not supported
+ Integer taskId = null;
+ Integer workerPort = null; // not supported
+ List<Integer> workerTasks = new ArrayList<Integer>();
+ final Map<String, Object> defaultResources = new HashMap<String, Object>();
+ final Map<String, Object> userResources = new HashMap<String, Object>();
+ final Map<String, Object> executorData = new HashMap<String, Object>();
+ final Map registeredMetrics = new HashMap();
+ Atom openOrPrepareWasCalled = null;
- final ComponentCommon common = new ComponentCommon();
- common.set_parallelism_hint(context.getNumberOfParallelSubtasks());
+ if (stormTopology == null) {
+ // embedded mode
+ ComponentCommon common = new ComponentCommon();
+ common.set_parallelism_hint(dop);
- final Map<String, Bolt> bolts = new HashMap<String, Bolt>();
- final Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
+ HashMap<String, SpoutSpec> spouts = new HashMap<String, SpoutSpec>();
+ HashMap<String, Bolt> bolts = new HashMap<String, Bolt>();
+ if (spoutOrBolt instanceof IRichSpout) {
+ spouts.put(operatorName, new SpoutSpec(null, common));
+ } else {
+ assert (spoutOrBolt instanceof IRichBolt);
+ bolts.put(operatorName, new Bolt(null, common));
+ }
+ stormTopology = new StormTopology(spouts, bolts, new HashMap<String, StateSpoutSpec>());
- if (spoutOrBolt) {
- spoutSpecs.put(context.getTaskName(), new SpoutSpec(null, common));
+ taskId = context.getIndexOfThisSubtask();
+
+ List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+ for (int i = 1; i <= dop; ++i) {
+ taskToComponents.put(i, operatorName);
+ sortedTasks.add(i);
+ }
+ componentToSortedTasks.put(operatorName, sortedTasks);
+
+ FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ spoutOrBolt.declareOutputFields(declarer);
+ componentToStreamToFields.put(operatorName, declarer.outputStreams);
} else {
- bolts.put(context.getTaskName(), new Bolt(null, common));
+ // whole topology is built (ie, FlinkTopologyBuilder is used)
+ Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
+ Map<String, Bolt> bolts = stormTopology.get_bolts();
+ Map<String, StateSpoutSpec> stateSpouts = stormTopology.get_state_spouts();
+
+ tid = 1;
+
+ for (Entry<String, SpoutSpec> spout : spouts.entrySet()) {
+ Integer rc = processSingleOperator(spout.getKey(), spout.getValue().get_common(),
+ operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents,
+ componentToSortedTasks, componentToStreamToFields);
+ if (rc != null) {
+ taskId = rc;
+ }
+ }
+ for (Entry<String, Bolt> bolt : bolts.entrySet()) {
+ Integer rc = processSingleOperator(bolt.getKey(), bolt.getValue().get_common(),
+ operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents,
+ componentToSortedTasks, componentToStreamToFields);
+ if (rc != null) {
+ taskId = rc;
+ }
+ }
+ for (Entry<String, StateSpoutSpec> stateSpout : stateSpouts.entrySet()) {
+ Integer rc = taskId = processSingleOperator(stateSpout.getKey(), stateSpout
+ .getValue().get_common(), operatorName, context.getIndexOfThisSubtask(),
+ dop, taskToComponents, componentToSortedTasks, componentToStreamToFields);
+ if (rc != null) {
+ taskId = rc;
+ }
+ }
+ assert (taskId != null);
+ }
+
+ if (!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+ stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30); // Storm default value
+ }
+
+ return new FlinkTopologyContext(stormTopology, stormConfig, taskToComponents,
+ componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir,
+ taskId, workerPort, workerTasks, defaultResources, userResources, executorData,
+ registeredMetrics, openOrPrepareWasCalled);
+ }
+
+ /**
+ * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, and {@code componentToStreamToFields} for a
+ * single instance of a Spout or Bolt (ie, task or executor). Furthermore, is computes the unique task-id.
+ *
+ * @param componentId
+ * The ID of the Spout/Bolt in the topology.
+ * @param common
+ * The common operator object (that is all Spouts and Bolts have).
+ * @param operatorName
+ * The Flink operator name.
+ * @param index
+ * The index of the currently processed tasks with its operator.
+ * @param dop
+ * The parallelism of the operator.
+ * @param taskToComponents
+ * OUTPUT: A map from all task IDs of the topology to their component IDs.
+ * @param componentToSortedTasks
+ * OUTPUT: A map from all component IDs to their sorted list of corresponding task IDs.
+ * @param componentToStreamToFields
+ * OUTPUT: A map from all component IDs to there output streams and output fields.
+ *
+ * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current
+ * Flink operator ({@link operatorName}) -- {@code null} otherwise.
+ */
+ private static Integer processSingleOperator(final String componentId,
+ final ComponentCommon common, final String operatorName, final int index,
+ final int dop, final Map<Integer, String> taskToComponents,
+ final Map<String, List<Integer>> componentToSortedTasks,
+ final Map<String, Map<String, Fields>> componentToStreamToFields) {
+ final int parallelism_hint = common.get_parallelism_hint();
+ Integer taskId = null;
+
+ if (componentId.equals(operatorName)) {
+ taskId = tid + index;
+ }
+
+ List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+ for (int i = 0; i < parallelism_hint; ++i) {
+ taskToComponents.put(tid, componentId);
+ sortedTasks.add(tid);
+ ++tid;
+ }
+ componentToSortedTasks.put(componentId, sortedTasks);
+
+ if (componentId.equals(operatorName)) {
+ }
+
+ Map<String, Fields> outputStreams = new HashMap<String, Fields>();
+ for(Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) {
+ outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields()));
}
+ componentToStreamToFields.put(componentId, outputStreams);
- return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
+ return taskId;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
deleted file mode 100644
index 08ac60b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.LinkedList;
-
-public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-
-
-
- @Test
- public void testNull() {
- Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
- }
-
- @Test
- public void testDeclare() {
- for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
- for (int j = 1; j < 2; ++j) { // number of streams
- for (int k = 0; k <= 25; ++k) { // number of attributes
- this.runDeclareTest(i, j, k);
- }
- }
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDeclareSimpleToManyAttributes() {
- this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDeclareNonDirectToManyAttributes() {
- this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDeclareDefaultStreamToManyAttributes() {
- this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDeclareFullToManyAttributes() {
- this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
- }
-
- private void runDeclareTest(final int testCase, final int numberOfStreams,
- final int numberOfAttributes) {
- final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-
- String[] streams = null;
- if (numberOfStreams > 1 || r.nextBoolean()) {
- streams = new String[numberOfStreams];
- for (int i = 0; i < numberOfStreams; ++i) {
- streams[i] = "stream" + i;
- }
- }
-
- final String[] attributes = new String[numberOfAttributes];
- for (int i = 0; i < attributes.length; ++i) {
- attributes[i] = "a" + i;
- }
-
- switch (testCase) {
- case 0:
- this.declareSimple(declarer, streams, attributes);
- break;
- default:
- this.declareNonDirect(declarer, streams, attributes);
- }
-
- if (streams == null) {
- streams = new String[] { Utils.DEFAULT_STREAM_ID };
- }
-
- for (String stream : streams) {
- final TypeInformation<?> type = declarer.getOutputType(stream);
-
- if (numberOfAttributes == 1) {
- Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
- Assert.assertEquals(type.getTypeClass(), Object.class);
- } else {
- Assert.assertEquals(numberOfAttributes, type.getArity());
- Assert.assertTrue(type.isTupleType());
- }
- }
- }
-
- private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
- final String[] attributes) {
-
- if (streams != null) {
- for (String stream : streams) {
- declarer.declareStream(stream, new Fields(attributes));
- }
- } else {
- declarer.declare(new Fields(attributes));
- }
- }
-
- private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
- final String[] attributes) {
-
- if (streams != null) {
- for (String stream : streams) {
- declarer.declareStream(stream, false, new Fields(attributes));
- }
- } else {
- declarer.declare(false, new Fields(attributes));
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUndeclared() {
- final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
- declarer.getOutputType("unknownStreamId");
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeclareDirect() {
- new FlinkOutputFieldsDeclarer().declare(true, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeclareDirect2() {
- new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
- }
-
- @Test
- public void testGetGroupingFieldIndexes() {
- final int numberOfAttributes = 5 + this.r.nextInt(21);
- final String[] attributes = new String[numberOfAttributes];
- for (int i = 0; i < numberOfAttributes; ++i) {
- attributes[i] = "a" + i;
- }
-
- final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
- declarer.declare(new Fields(attributes));
-
- final int numberOfKeys = 1 + this.r.nextInt(25);
- final LinkedList<String> groupingFields = new LinkedList<String>();
- final boolean[] indexes = new boolean[numberOfAttributes];
-
- for (int i = 0; i < numberOfAttributes; ++i) {
- if (this.r.nextInt(26) < numberOfKeys) {
- groupingFields.add(attributes[i]);
- indexes[i] = true;
- } else {
- indexes[i] = false;
- }
- }
-
- final int[] expectedResult = new int[groupingFields.size()];
- int j = 0;
- for (int i = 0; i < numberOfAttributes; ++i) {
- if (indexes[i]) {
- expectedResult[j++] = i;
- }
- }
-
- final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
- groupingFields);
-
- Assert.assertEquals(expectedResult.length, result.length);
- for (int i = 0; i < expectedResult.length; ++i) {
- Assert.assertEquals(expectedResult[i], result[i]);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
deleted file mode 100644
index d214610..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import org.junit.Test;
-
-public class FlinkTopologyContextTest {
-
- @Test(expected = UnsupportedOperationException.class)
- public void testAddTaskHook() {
- new FlinkTopologyContext(null, null, null).addTaskHook(null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetHooks() {
- new FlinkTopologyContext(null, null, null).getHooks();
- }
-
- @SuppressWarnings("rawtypes")
- @Test(expected = UnsupportedOperationException.class)
- public void testRegisteredMetric1() {
- new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner) null, 0);
- }
-
- @SuppressWarnings("rawtypes")
- @Test(expected = UnsupportedOperationException.class)
- public void testRegisteredMetric2() {
- new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer) null, 0);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testRegisteredMetric3() {
- new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric) null, 0);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetRegisteredMetricByName() {
- new FlinkTopologyContext(null, null, null).getRegisteredMetricByName(null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSetAllSubscribedState() {
- new FlinkTopologyContext(null, null, null).setAllSubscribedState(null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSetSubscribedState1() {
- new FlinkTopologyContext(null, null, null).setSubscribedState(null, null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSetSubscribedState2() {
- new FlinkTopologyContext(null, null, null).setSubscribedState(null, null, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
index f179919..c98c9a3 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
@@ -24,23 +24,23 @@ public class FlinkTopologyTest {
@Test
public void testDefaultParallelism() {
- final FlinkTopology topology = new FlinkTopology(null);
+ final FlinkTopology topology = new FlinkTopology();
Assert.assertEquals(1, topology.getParallelism());
}
@Test(expected = UnsupportedOperationException.class)
public void testExecute() throws Exception {
- new FlinkTopology(null).execute();
+ new FlinkTopology().execute();
}
@Test(expected = UnsupportedOperationException.class)
public void testExecuteWithName() throws Exception {
- new FlinkTopology(null).execute(null);
+ new FlinkTopology().execute(null);
}
@Test
public void testNumberOfTasks() {
- final FlinkTopology topology = new FlinkTopology(null);
+ final FlinkTopology topology = new FlinkTopology();
Assert.assertEquals(0, topology.getNumberOfTasks());
@@ -56,7 +56,7 @@ public class FlinkTopologyTest {
@Test(expected = AssertionError.class)
public void testAssert() {
- new FlinkTopology(null).increaseNumberOfTasks(0);
+ new FlinkTopology().increaseNumberOfTasks(0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
new file mode 100644
index 0000000..f664e58
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.generated.StormTopology;
+
+public class TestTopologyBuilder extends FlinkTopologyBuilder {
+ @Override
+ public StormTopology getStormTopology() {
+ return super.getStormTopology();
+ }
+}