You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/21 19:36:05 UTC
[2/4] flink git commit: [FLINK-2306] Add support for named streams in
Storm compatibility layer - enabled .declareStream() and connect via stream
name - enabled multiplt output streams - added .split() / .select() / strip
pattern - added helpers in n
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
index 44b3f68..45eb56c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
@@ -17,17 +17,22 @@
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.Collection;
+
import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
+import com.google.common.collect.Sets;
+
/**
- * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls
- * {@link IRichSpout#nextTuple() nextTuple()} for finite number of times before
- * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)}
- * returns. The number of {@code nextTuple()} calls can be specified as a certain number of
- * invocations or can be undefined. In the undefined case, the {@code run(...)} method return if no
- * record was emitted to the output collector for the first time.
+ * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls {@link IRichSpout#nextTuple()
+ * nextTuple()} for finite number of times before
+ * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)} returns. The number of
+ * {@code nextTuple()} calls can be specified as a certain number of invocations or can be undefined. In the undefined
+ * case, the {@code run(...)} method return if no record was emitted to the output collector for the first time.
*/
public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
private static final long serialVersionUID = 3883246587044801286L;
@@ -38,79 +43,126 @@ public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
* method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
- * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+ * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
* attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [0;25].
*/
public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
- this(spout, false, -1);
+ this(spout, (Collection<String>) null, -1);
}
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be
- * one
- * of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of attributes.
- *
+ * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one
+ * of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of attributes.
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
+ * The number of calls to {@link IRichSpout#nextTuple()}.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [0;25].
*/
public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
throws IllegalArgumentException {
- this(spout, false, numberOfInvocations);
+ this(spout, (Collection<String>) null, numberOfInvocations);
+ }
+
+ /**
+ * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+ * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
+ * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
+ * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
+ * {@link Tuple25} depending on the spout's declared number of attributes.
+ *
+ * @param spout
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
+ */
+ public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
+ throws IllegalArgumentException {
+ this(spout, Sets.newHashSet(rawOutputs), -1);
}
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
* method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
* output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
- * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
+ * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
* {@link Tuple25} depending on the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
- this(spout, rawOutput, -1);
+ public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
+ throws IllegalArgumentException {
+ this(spout, rawOutputs, -1);
}
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
* method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
* type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
- * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+ * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
* the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
+ * The number of calls to {@link IRichSpout#nextTuple()}.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput, final int numberOfInvocations)
- throws IllegalArgumentException {
- super(spout, rawOutput);
+ public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
+ final int numberOfInvocations) throws IllegalArgumentException {
+ super(spout, Sets.newHashSet(rawOutputs));
+ this.numberOfInvocations = numberOfInvocations;
+ }
+
+ /**
+ * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+ * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
+ * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
+ * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
+ * the spout's declared number of attributes.
+ *
+ * @param spout
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
+ * @param numberOfInvocations
+ * The number of calls to {@link IRichSpout#nextTuple()}.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
+ */
+ public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs,
+ final int numberOfInvocations) throws IllegalArgumentException {
+ super(spout, rawOutputs);
this.numberOfInvocations = numberOfInvocations;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
index 231cab6..f33d4d3 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
@@ -17,18 +17,21 @@
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.HashMap;
+
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
/**
- * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the
- * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)} method.
+ * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and
+ * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
+ * method.
*/
class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
- /** The output schema declared by the wrapped bolt. */
- private Fields outputSchema = null;
+ /** The number of attributes for each declared stream by the wrapped operator. */
+ HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
@Override
public void declare(final Fields fields) {
@@ -47,28 +50,14 @@ class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
@Override
public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
- throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
+ if (streamId == null) {
+ throw new IllegalArgumentException("Stream ID cannot be null.");
}
if (direct) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}
- this.outputSchema = fields;
- }
-
- /**
- * Returns the number of attributes of the output schema declare by the wrapped bolt. If no output schema is
- * declared (eg, for sink bolts), {@code -1} is returned.
- *
- * @return the number of attributes of the output schema declare by the wrapped bolt
- */
- public int getNumberOfAttributes() {
- if (this.outputSchema != null) {
- return this.outputSchema.size();
- }
-
- return -1;
+ this.outputSchemas.put(streamId, fields.size());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
index 09a7ac7..5a20056 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
@@ -19,9 +19,11 @@ package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.spout.ISpoutOutputCollector;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+import java.util.HashMap;
import java.util.List;
/**
@@ -31,23 +33,23 @@ import java.util.List;
*/
class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
- // The Flink source context object
+ /** The Flink source context object */
private final SourceContext<OUT> flinkContext;
/**
- * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink
- * source context. If the number of attributes is specified as zero, any output type is
- * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
- * to {@link Tuple25}.
+ * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink source context. If the
+ * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
+ * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
*
* @param numberOfAttributes
- * The number of attributes of the emitted tuples.
+ * The number of attributes of the emitted tuples.
* @param flinkContext
- * The Flink source context to be used.
+ * The Flink source context to be used.
* @throws UnsupportedOperationException
- * if the specified number of attributes is not in the valid range of [0,25]
+ * if the specified number of attributes is greater than 25
*/
- public StormSpoutCollector(final int numberOfAttributes, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
+ public StormSpoutCollector(final HashMap<String, Integer> numberOfAttributes,
+ final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
super(numberOfAttributes);
assert (flinkContext != null);
this.flinkContext = flinkContext;
@@ -68,7 +70,7 @@ class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements IS
@Override
public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
- return this.transformAndEmit(tuple);
+ return this.tansformAndEmit(streamId, tuple);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
index ab9a890..300b241 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
@@ -17,10 +17,16 @@
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.Collection;
+
import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
+import com.google.common.collect.Sets;
+
/**
* A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's
* {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop.
@@ -29,39 +35,61 @@ public class StormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
private static final long serialVersionUID = -218340336648247605L;
/**
- * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
- * can
- * be used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
+ * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+ * be used within a Flink streaming program. The output type will be one of {@link Tuple0} to {@link Tuple25}
* depending on the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [0;25].
*/
public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
- super(spout, false);
+ super(spout, null);
+ }
+
+ /**
+ * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+ * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
+ * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
+ * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
+ * attributes.
+ *
+ * @param spout
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type. (Can be {@code null}.)
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
+ */
+ public StormSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
+ throws IllegalArgumentException {
+ super(spout, Sets.newHashSet(rawOutputs));
}
/**
- * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
- * can be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
+ * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+ * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
- * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+ * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
* attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type. (Can be {@code null}.)
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public StormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
- super(spout, rawOutput);
+ public StormSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
+ throws IllegalArgumentException {
+ super(spout, rawOutputs);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index e2e303a..75ab8e0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -25,11 +25,14 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
+
import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
/**
* {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or
@@ -38,52 +41,47 @@ import java.util.Map;
class StormWrapperSetupHelper {
/**
- * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or
- * {@link StormBoltWrapper}. Returns zero for raw output type or a value within range [1;25] for
- * output type {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} to
- * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25} . In case of a data sink, {@code -1}
- * is returned.
+ * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}
+ * per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for
+ * output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to
+ * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}.
*
* @param spoutOrBolt
- * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type
- * {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type.
- * @return The number of attributes to be used.
+ * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type
+ * {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type. (Can be {@code null}.)
+ * @return The number of attributes to be used for each stream.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output
- * attributes is not 1 or if {@code rawOuput} is {@code false} and the number
- * of declared output attributes is not with range [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput)
- throws IllegalArgumentException {
+ public static HashMap<String, Integer> getNumberOfAttributes(final IComponent spoutOrBolt,
+ final Collection<String> rawOutputs)
+ throws IllegalArgumentException {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
spoutOrBolt.declareOutputFields(declarer);
- final int declaredNumberOfAttributes = declarer.getNumberOfAttributes();
-
- if (declaredNumberOfAttributes == -1) {
- return -1;
- }
-
- if ((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
- throw new IllegalArgumentException(
- "Provided bolt declares non supported number of output attributes. Must be in range [1;25] but " +
- "was "
- + declaredNumberOfAttributes);
- }
-
- if (rawOutput) {
- if (declaredNumberOfAttributes > 1) {
+ for (Entry<String, Integer> schema : declarer.outputSchemas.entrySet()) {
+ int declaredNumberOfAttributes = schema.getValue();
+ if ((declaredNumberOfAttributes < 0) || (declaredNumberOfAttributes > 25)) {
throw new IllegalArgumentException(
- "Ouput type is requested to be raw type, but provided bolt declares more then one output " +
- "attribute.");
+ "Provided bolt declares non supported number of output attributes. Must be in range [0;25] but "
+ + "was " + declaredNumberOfAttributes);
+ }
+ if (rawOutputs != null && rawOutputs.contains(schema.getKey())) {
+ if (declaredNumberOfAttributes != 1) {
+ throw new IllegalArgumentException(
+ "Ouput type is requested to be raw type, but provided bolt declares more then one output "
+ + "attribute.");
+ }
+ schema.setValue(-1);
}
- return 0;
}
- return declaredNumberOfAttributes;
+ return declarer.outputSchemas;
}
// TODO
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
index 58d81f9..08ac60b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.stormcompatibility.api;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -31,84 +32,111 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
@Test
+ public void testNull() {
+ Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
+ }
+
+ @Test
public void testDeclare() {
- for (int i = 0; i < 4; ++i) {
- for (int j = 0; j <= 25; ++j) {
- this.runDeclareTest(i, j);
+ for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
+ for (int j = 1; j < 2; ++j) { // number of streams
+ for (int k = 0; k <= 25; ++k) { // number of attributes
+ this.runDeclareTest(i, j, k);
+ }
}
}
}
@Test(expected = IllegalArgumentException.class)
public void testDeclareSimpleToManyAttributes() {
- this.runDeclareTest(0, 26);
+ this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
}
@Test(expected = IllegalArgumentException.class)
public void testDeclareNonDirectToManyAttributes() {
- this.runDeclareTest(1, 26);
+ this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
}
@Test(expected = IllegalArgumentException.class)
public void testDeclareDefaultStreamToManyAttributes() {
- this.runDeclareTest(2, 26);
+ this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
}
@Test(expected = IllegalArgumentException.class)
public void testDeclareFullToManyAttributes() {
- this.runDeclareTest(3, 26);
+ this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
}
- private void runDeclareTest(final int testCase, final int numberOfAttributes) {
+ private void runDeclareTest(final int testCase, final int numberOfStreams,
+ final int numberOfAttributes) {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ String[] streams = null;
+ if (numberOfStreams > 1 || r.nextBoolean()) {
+ streams = new String[numberOfStreams];
+ for (int i = 0; i < numberOfStreams; ++i) {
+ streams[i] = "stream" + i;
+ }
+ }
+
final String[] attributes = new String[numberOfAttributes];
- for (int i = 0; i < numberOfAttributes; ++i) {
+ for (int i = 0; i < attributes.length; ++i) {
attributes[i] = "a" + i;
}
switch (testCase) {
- case 0:
- this.declareSimple(declarer, attributes);
- break;
- case 1:
- this.declareNonDirect(declarer, attributes);
- break;
- case 2:
- this.declareDefaultStream(declarer, attributes);
- break;
- default:
- this.declareFull(declarer, attributes);
+ case 0:
+ this.declareSimple(declarer, streams, attributes);
+ break;
+ default:
+ this.declareNonDirect(declarer, streams, attributes);
+ }
+
+ if (streams == null) {
+ streams = new String[] { Utils.DEFAULT_STREAM_ID };
}
- final TypeInformation<?> type = declarer.getOutputType();
+ for (String stream : streams) {
+ final TypeInformation<?> type = declarer.getOutputType(stream);
- if (numberOfAttributes == 0) {
- Assert.assertNull(type);
- } else {
- Assert.assertEquals(numberOfAttributes, type.getArity());
if (numberOfAttributes == 1) {
- Assert.assertFalse(type.isTupleType());
+ Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
+ Assert.assertEquals(type.getTypeClass(), Object.class);
} else {
+ Assert.assertEquals(numberOfAttributes, type.getArity());
Assert.assertTrue(type.isTupleType());
}
}
}
- private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
- declarer.declare(new Fields(attributes));
- }
+ private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+ final String[] attributes) {
- private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
- declarer.declare(false, new Fields(attributes));
+ if (streams != null) {
+ for (String stream : streams) {
+ declarer.declareStream(stream, new Fields(attributes));
+ }
+ } else {
+ declarer.declare(new Fields(attributes));
+ }
}
- private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
- declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
+ private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+ final String[] attributes) {
+
+ if (streams != null) {
+ for (String stream : streams) {
+ declarer.declareStream(stream, false, new Fields(attributes));
+ }
+ } else {
+ declarer.declare(false, new Fields(attributes));
+ }
}
- private void declareFull(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
- declarer.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
+ @Test(expected = IllegalArgumentException.class)
+ public void testUndeclared() {
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ declarer.getOutputType("unknownStreamId");
}
@Test(expected = UnsupportedOperationException.class)
@@ -117,20 +145,10 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
}
@Test(expected = UnsupportedOperationException.class)
- public void testDeclareNonDefaultStrem() {
- new FlinkOutputFieldsDeclarer().declareStream("dummy", null);
- }
-
- @Test(expected = UnsupportedOperationException.class)
public void testDeclareDirect2() {
new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
}
- @Test(expected = UnsupportedOperationException.class)
- public void testDeclareNonDefaultStrem2() {
- new FlinkOutputFieldsDeclarer().declareStream("dummy", this.r.nextBoolean(), null);
- }
-
@Test
public void testGetGroupingFieldIndexes() {
final int numberOfAttributes = 5 + this.r.nextInt(21);
@@ -163,7 +181,8 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
}
}
- final int[] result = declarer.getGroupingFieldIndexes(groupingFields);
+ final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
+ groupingFields);
Assert.assertEquals(expectedResult.length, result.length);
for (int i = 0; i < expectedResult.length; ++i) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
new file mode 100644
index 0000000..0187020
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import org.junit.Test;
+
+public class FlinkTopologyBuilderTest {
+
+ @Test(expected = RuntimeException.class)
+ public void testUnknowSpout() {
+ FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+ builder.setSpout("spout", new TestSpout());
+ builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown");
+ builder.createTopology();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testUnknowBolt() {
+ FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+ builder.setSpout("spout", new TestSpout());
+ builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout");
+ builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown");
+ builder.createTopology();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testUndeclaredStream() {
+ FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+ builder.setSpout("spout", new TestSpout());
+ builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout");
+ builder.createTopology();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
new file mode 100644
index 0000000..2e4a534
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestBolt implements IRichBolt {
+ private static final long serialVersionUID = -667148827441397683L;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+
+ @Override
+ public void execute(Tuple input) {}
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
new file mode 100644
index 0000000..146218f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+public class TestSpout implements IRichSpout {
+ private static final long serialVersionUID = -4884029383198924007L;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void activate() {}
+
+ @Override
+ public void deactivate() {}
+
+ @Override
+ public void nextTuple() {}
+
+ @Override
+ public void ack(Object msgId) {}
+
+ @Override
+ public void fail(Object msgId) {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
new file mode 100644
index 0000000..c0a6ed3
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import java.util.Iterator;
+
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkStormStreamSelectorTest {
+
+ @Test
+ public void testSelector() {
+ FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
+ SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+ Iterator<String> result;
+
+ tuple.streamId = "stream1";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream1", result.next());
+ Assert.assertFalse(result.hasNext());
+
+ tuple.streamId = "stream2";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream2", result.next());
+ Assert.assertFalse(result.hasNext());
+
+ tuple.streamId = "stream1";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream1", result.next());
+ Assert.assertFalse(result.hasNext());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
index 3997505..d01c3e0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import static org.mockito.Mockito.mock;
@@ -36,19 +37,23 @@ public class StormBoltCollectorTest extends AbstractTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
- for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+ for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
final Output flinkCollector = mock(Output.class);
Tuple flinkTuple = null;
final Values tuple = new Values();
StormBoltCollector<?> collector;
- if (numberOfAttributes == 0) {
- collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+ final String streamId = "streamId";
+ HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+ attributes.put(streamId, numberOfAttributes);
+
+ if (numberOfAttributes == -1) {
+ collector = new StormBoltCollector(attributes, flinkCollector);
tuple.add(new Integer(this.r.nextInt()));
} else {
- collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+ collector = new StormBoltCollector(attributes, flinkCollector);
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
for (int i = 0; i < numberOfAttributes; ++i) {
@@ -57,14 +62,13 @@ public class StormBoltCollectorTest extends AbstractTest {
}
}
- final String streamId = "streamId";
final Collection anchors = mock(Collection.class);
final List<Integer> taskIds;
taskIds = collector.emit(streamId, anchors, tuple);
Assert.assertNull(taskIds);
- if (numberOfAttributes == 0) {
+ if (numberOfAttributes == -1) {
verify(flinkCollector).collect(tuple.get(0));
} else {
verify(flinkCollector).collect(flinkTuple);
@@ -76,26 +80,26 @@ public class StormBoltCollectorTest extends AbstractTest {
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testReportError() {
- new StormBoltCollector<Object>(1, mock(Output.class)).reportError(null);
+ new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).reportError(null);
}
- @SuppressWarnings({"rawtypes", "unchecked"})
+ @SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testEmitDirect() {
- new StormBoltCollector<Object>(1, mock(Output.class)).emitDirect(0, null,
+ new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).emitDirect(0, null,
null, null);
}
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testAck() {
- new StormBoltCollector<Object>(1, mock(Output.class)).ack(null);
+ new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).ack(null);
}
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testFail() {
- new StormBoltCollector<Object>(1, mock(Output.class)).fail(null);
+ new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).fail(null);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 3e55d23..2491486 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -20,10 +20,16 @@ package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -34,18 +40,17 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.HashSet;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
-public class StormBoltWrapperTest {
+public class StormBoltWrapperTest extends AbstractTest {
@Test(expected = IllegalArgumentException.class)
public void testWrapperRawType() throws Exception {
@@ -53,7 +58,8 @@ public class StormBoltWrapperTest {
declarer.declare(new Fields("dummy1", "dummy2"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
+ new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
+ new String[] { Utils.DEFAULT_STREAM_ID });
}
@Test(expected = IllegalArgumentException.class)
@@ -79,38 +85,40 @@ public class StormBoltWrapperTest {
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
+ new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
}
@Test
public void testWrapper() throws Exception {
- for (int i = 0; i < 26; ++i) {
+ for (int i = -1; i < 26; ++i) {
this.testWrapper(i);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void testWrapper(final int numberOfAttributes) throws Exception {
- assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
+ assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25));
Tuple flinkTuple = null;
String rawTuple = null;
- if (numberOfAttributes == 0) {
+ if (numberOfAttributes == -1) {
rawTuple = "test";
} else {
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
}
- String[] schema = new String[numberOfAttributes];
- if (numberOfAttributes == 0) {
+ String[] schema;
+ if (numberOfAttributes == -1) {
schema = new String[1];
+ } else {
+ schema = new String[numberOfAttributes];
}
for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
final StreamRecord record = mock(StreamRecord.class);
- if (numberOfAttributes == 0) {
+ if (numberOfAttributes == -1) {
when(record.getValue()).thenReturn(rawTuple);
} else {
when(record.getValue()).thenReturn(flinkTuple);
@@ -124,17 +132,63 @@ public class StormBoltWrapperTest {
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, null);
+ final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
wrapper.setup(mock(Output.class), taskContext);
wrapper.open(new Configuration());
wrapper.processElement(record);
- if (numberOfAttributes == 0) {
+ if (numberOfAttributes == -1) {
verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
} else {
verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
}
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testMultipleOutputStreams() throws Exception {
+ final boolean rawOutType1 = super.r.nextBoolean();
+ final boolean rawOutType2 = super.r.nextBoolean();
+
+ final StreamRecord record = mock(StreamRecord.class);
+ when(record.getValue()).thenReturn(2).thenReturn(3);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ Output output = mock(Output.class);
+
+ TestBolt bolt = new TestBolt();
+ HashSet<String> raw = new HashSet<String>();
+ if (rawOutType1) {
+ raw.add("stream1");
+ }
+ if (rawOutType2) {
+ raw.add("stream2");
+ }
+
+ final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null, raw);
+ wrapper.setup(output, taskContext);
+ wrapper.open(new Configuration());
+
+ SplitStreamType splitRecord = new SplitStreamType<Integer>();
+ if (rawOutType1) {
+ splitRecord.streamId = "stream1";
+ splitRecord.value = 2;
+ } else {
+ splitRecord.streamId = "stream1";
+ splitRecord.value = new Tuple1<Integer>(2);
+ }
+ wrapper.processElement(record);
+ verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
+ if (rawOutType2) {
+ splitRecord.streamId = "stream2";
+ splitRecord.value = 3;
+ } else {
+ splitRecord.streamId = "stream2";
+ splitRecord.value = new Tuple1<Integer>(3);
+ }
+ wrapper.processElement(record);
+ verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
}
@SuppressWarnings("unchecked")
@@ -185,4 +239,40 @@ public class StormBoltWrapperTest {
verify(bolt).cleanup();
}
+ private static final class TestBolt implements IRichBolt {
+ private static final long serialVersionUID = 7278692872260138758L;
+ private OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ int counter = 0;
+ @Override
+ public void execute(backtype.storm.tuple.Tuple input) {
+ if (++counter % 2 == 1) {
+ this.collector.emit("stream1", new Values(input.getInteger(0)));
+ } else {
+ this.collector.emit("stream2", new Values(input.getInteger(0)));
+ }
+ }
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("stream1", new Fields("a1"));
+ declarer.declareStream("stream2", new Fields("a2"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
index cfde770..a28b6e5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -31,50 +32,60 @@ public class StormOutputFieldsDeclarerTest extends AbstractTest {
public void testDeclare() {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
- Assert.assertEquals(-1, declarer.getNumberOfAttributes());
+ int numberOfAttributes = this.r.nextInt(26);
+ declarer.declare(createSchema(numberOfAttributes));
+ Assert.assertEquals(1, declarer.outputSchemas.size());
+ Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+ .intValue());
+
+ final String sid = "streamId";
+ numberOfAttributes = 0 + this.r.nextInt(26);
+ declarer.declareStream(sid, createSchema(numberOfAttributes));
+ Assert.assertEquals(2, declarer.outputSchemas.size());
+ Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
+ }
- final int numberOfAttributes = 1 + this.r.nextInt(25);
+ private Fields createSchema(final int numberOfAttributes) {
final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
for (int i = 0; i < numberOfAttributes; ++i) {
schema.add("a" + i);
}
- declarer.declare(new Fields(schema));
- Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
+ return new Fields(schema);
}
@Test
public void testDeclareDirect() {
- new StormOutputFieldsDeclarer().declare(false, null);
+ new StormOutputFieldsDeclarer().declare(false, new Fields());
}
@Test(expected = UnsupportedOperationException.class)
public void testDeclareDirectFail() {
- new StormOutputFieldsDeclarer().declare(true, null);
+ new StormOutputFieldsDeclarer().declare(true, new Fields());
}
@Test
public void testDeclareStream() {
- new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
+ new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
}
- @Test(expected = UnsupportedOperationException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testDeclareStreamFail() {
- new StormOutputFieldsDeclarer().declareStream(null, null);
+ new StormOutputFieldsDeclarer().declareStream(null, new Fields());
}
@Test
public void testDeclareFullStream() {
- new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
+ new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
}
- @Test(expected = UnsupportedOperationException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testDeclareFullStreamFailNonDefaultStream() {
- new StormOutputFieldsDeclarer().declareStream(null, false, null);
+ new StormOutputFieldsDeclarer().declareStream(null, false, new Fields());
}
@Test(expected = UnsupportedOperationException.class)
public void testDeclareFullStreamFailDirect() {
- new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+ new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
index e4826bb..36ed58a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
import java.util.List;
import static org.mockito.Mockito.mock;
@@ -35,19 +36,23 @@ public class StormSpoutCollectorTest extends AbstractTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
- for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+ for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
final SourceContext flinkCollector = mock(SourceContext.class);
Tuple flinkTuple = null;
final Values tuple = new Values();
StormSpoutCollector<?> collector;
- if (numberOfAttributes == 0) {
- collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+ final String streamId = "streamId";
+ HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+ attributes.put(streamId, numberOfAttributes);
+
+ if (numberOfAttributes == -1) {
+ collector = new StormSpoutCollector(attributes, flinkCollector);
tuple.add(new Integer(this.r.nextInt()));
} else {
- collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+ collector = new StormSpoutCollector(attributes, flinkCollector);
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
for (int i = 0; i < numberOfAttributes; ++i) {
@@ -56,7 +61,6 @@ public class StormSpoutCollectorTest extends AbstractTest {
}
}
- final String streamId = "streamId";
final List<Integer> taskIds;
final Object messageId = new Integer(this.r.nextInt());
@@ -64,7 +68,7 @@ public class StormSpoutCollectorTest extends AbstractTest {
Assert.assertNull(taskIds);
- if (numberOfAttributes == 0) {
+ if (numberOfAttributes == -1) {
verify(flinkCollector).collect(tuple.get(0));
} else {
verify(flinkCollector).collect(flinkTuple);
@@ -75,13 +79,15 @@ public class StormSpoutCollectorTest extends AbstractTest {
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testReportError() {
- new StormSpoutCollector<Object>(1, mock(SourceContext.class)).reportError(null);
+ new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class))
+ .reportError(null);
}
@SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testEmitDirect() {
- new StormSpoutCollector<Object>(1, mock(SourceContext.class)).emitDirect(0, null, null,
+ new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect(
+ 0, null, null,
(Object) null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
index 96e7b35..06d5399 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
@@ -60,7 +60,7 @@ public class StormTupleTest extends AbstractTest {
@Test
public void tupleTest() throws InstantiationException, IllegalAccessException {
- final int numberOfAttributes = 1 + this.r.nextInt(25);
+ final int numberOfAttributes = this.r.nextInt(26);
final Object[] data = new Object[numberOfAttributes];
final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
index 15129ce..7497ffc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
@@ -17,10 +17,14 @@
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.HashMap;
+
import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -29,29 +33,14 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import com.google.common.collect.Sets;
+
import static org.mockito.Mockito.mock;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormWrapperSetupHelperTest extends AbstractTest {
- @Test(expected = IllegalArgumentException.class)
- public void testZeroAttributesDeclarerBolt() throws Exception {
- IComponent boltOrSpout;
-
- if (this.r.nextBoolean()) {
- boltOrSpout = mock(IRichSpout.class);
- } else {
- boltOrSpout = mock(IRichBolt.class);
- }
-
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
- declarer.declare(new Fields());
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean());
- }
-
@Test
public void testEmptyDeclarerBolt() {
IComponent boltOrSpout;
@@ -62,7 +51,8 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
boltOrSpout = mock(IRichBolt.class);
}
- Assert.assertEquals(-1, StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean()));
+ Assert.assertEquals(new HashMap<String, Integer>(),
+ StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null));
}
@Test(expected = IllegalArgumentException.class)
@@ -79,7 +69,8 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
declarer.declare(new Fields("dummy1", "dummy2"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, true);
+ StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
+ Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
}
@Test(expected = IllegalArgumentException.class)
@@ -100,20 +91,22 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, false);
+ StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
}
@Test
public void testTupleTypes() throws Exception {
- for (int i = 0; i < 26; ++i) {
+ for (int i = -1; i < 26; ++i) {
this.testTupleTypes(i);
}
}
private void testTupleTypes(final int numberOfAttributes) throws Exception {
- String[] schema = new String[numberOfAttributes];
- if (numberOfAttributes == 0) {
+ String[] schema;
+ if (numberOfAttributes == -1) {
schema = new String[1];
+ } else {
+ schema = new String[numberOfAttributes];
}
for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
@@ -130,7 +123,13 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, numberOfAttributes == 0);
+ HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+ attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
+
+ Assert.assertEquals(attributes, StormWrapperSetupHelper.getNumberOfAttributes(
+ boltOrSpout,
+ numberOfAttributes == -1 ? Sets
+ .newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
index 4621650..430972b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
@@ -135,6 +135,7 @@ under the License.
<executions>
<!-- WordCount Spout source-->
+ <!-- example for embedded spout - for whole topologies see "WordCount Storm topology" example below -->
<execution>
<id>WordCount-SpoutSource</id>
<phase>package</phase>
@@ -176,6 +177,7 @@ under the License.
</execution>
<!-- WordCount Bolt tokenizer-->
+ <!-- example for embedded bolt - for whole topologies see "WordCount Storm topology" example below -->
<execution>
<id>WordCount-BoltTokenizer</id>
<phase>package</phase>
@@ -222,6 +224,7 @@ under the License.
</plugin>
<!-- WordCount Storm topology-->
+ <!-- example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
index 7bcb7f9..ee5d9f9 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
@@ -26,6 +26,8 @@ import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import backtype.storm.utils.Utils;
+
/**
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
* files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
@@ -65,8 +67,9 @@ public class ExclamationWithStormBolt {
final DataStream<String> exclaimed = text
.transform("StormBoltTokenizer",
TypeExtractor.getForObject(""),
- new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
- .map(new ExclamationMap());
+ new StormBoltWrapper<String, String>(new ExclamationBolt(),
+ new String[] { Utils.DEFAULT_STREAM_ID }))
+ .map(new ExclamationMap());
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
index f027eae..962a318 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
@@ -27,6 +27,8 @@ import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import backtype.storm.utils.Utils;
+
/**
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
* files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
@@ -84,6 +86,7 @@ public class ExclamationWithStormSpout {
// *************************************************************************
private static class ExclamationMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = -684993133807698042L;
@Override
public String map(String value) throws Exception {
@@ -126,13 +129,14 @@ public class ExclamationWithStormSpout {
final String[] tokens = textPath.split(":");
final String localFile = tokens[tokens.length - 1];
return env.addSource(
- new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile), true),
- TypeExtractor.getForClass(String.class)).setParallelism(1);
+ new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile),
+ new String[] { Utils.DEFAULT_STREAM_ID }),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
}
return env.addSource(
- new FiniteStormSpoutWrapper<String>(
- new FiniteStormInMemorySpout(WordCountData.WORDS), true),
+ new FiniteStormSpoutWrapper<String>(new FiniteStormInMemorySpout(
+ WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
TypeExtractor.getForClass(String.class)).setParallelism(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
new file mode 100644
index 0000000..4116f3c
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
+import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamMapper;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements a simple example with two declared output streams for the embedded Spout.
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>handle multiple output stream of a spout</li>
+ * <li>accessing each stream by .split(...) and .select(...)</li>
+ * <li>strip wrapper data type SplitStreamType for furhter processing in Flink</li>
+ * </ul>
+ * <p/>
+ * This example would work the same way for multiple bolt output streams.
+ */
+public class SpoutSplitExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };
+
+ final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
+ new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
+ rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
+
+ SplitDataStream<SplitStreamType<Integer>> splitStream = numbers
+ .split(new FlinkStormStreamSelector<Integer>());
+
+ DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
+ DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);
+
+ evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
+ oddStream.transform("oddBolt",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new StormBoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
+ new VerifyAndEnrichBolt(false)))
+ .print();
+
+ // execute program
+ env.execute("Spout split stream example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Same as {@link VerifyAndEnrichBolt}.
+ */
+ private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 5213888269197438892L;
+ private final Tuple2<String, Integer> out;
+
+ public Enrich(String token) {
+ this.out = new Tuple2<String, Integer>(token, 0);
+ }
+
+ @Override
+ public Tuple2<String, Integer> map(Integer value) throws Exception {
+ this.out.setField(value, 1);
+ return this.out;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
new file mode 100644
index 0000000..75d710e
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split.stormoperators;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class RandomSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3978554318742509334L;
+
+ public static final String EVEN_STREAM = "even";
+ public static final String ODD_STREAM = "odd";
+
+ private final boolean split;
+ private Random r = new Random();
+ private SpoutOutputCollector collector;
+
+ public RandomSpout(boolean split, long seed) {
+ this.split = split;
+ this.r = new Random(seed);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ int i = r.nextInt();
+ if (split) {
+ if (i % 2 == 0) {
+ this.collector.emit(EVEN_STREAM, new Values(i));
+ } else {
+ this.collector.emit(ODD_STREAM, new Values(i));
+ }
+ } else {
+ this.collector.emit(new Values(i));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ Fields schema = new Fields("number");
+ if (split) {
+ declarer.declareStream(EVEN_STREAM, schema);
+ declarer.declareStream(ODD_STREAM, schema);
+ } else {
+ declarer.declare(schema);
+ }
+ }
+
+}