You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:01 UTC
[11/27] flink git commit: [storm-compat] Storm compatibility code
cleanup
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index d322cc5..6a5efdf 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -14,22 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-
-
-
-
-
/**
* A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
* program. It takes the Flink input tuples of type {@code IN} and transforms them into a {@link StormTuple}s that the
@@ -43,95 +39,88 @@ import backtype.storm.topology.IRichBolt;
*/
public class StormBoltWrapper<IN, OUT> extends StreamOperator<IN, OUT> {
private static final long serialVersionUID = -4788589118464155835L;
-
- /**
- * The wrapped Storm {@link IRichBolt bolt}.
- */
+
+ // The wrapped Storm {@link IRichBolt bolt}
private final IRichBolt bolt;
- /**
- * Number of attributes of the bolt's output tuples.
- */
+ // Number of attributes of the bolt's output tuples
private final int numberOfAttributes;
-
-
-
+
/**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25} depending
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
+ * can be
+ * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
+ * depending
* on the bolt's declared number of attributes.
- *
+ *
* @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
+ * The Storm {@link IRichBolt bolt} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
this(bolt, false);
}
-
+
/**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
+ * can be
* used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
* output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of
* attributes.
- *
+ *
* @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
+ * The Storm {@link IRichBolt bolt} to be used.
* @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
super(new FlinkDummyRichFunction());
this.bolt = bolt;
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
}
-
-
-
+
@Override
public void open(final Configuration parameters) throws Exception {
super.open(parameters);
-
- final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext)((FlinkDummyRichFunction)super
- .getUserFunction()).getRuntimeContext();
-
+
+ final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext) ((FlinkDummyRichFunction) super
+ .getUserFunction()).getRuntimeContext();
+
final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(flinkContext, false);
OutputCollector stormCollector = null;
-
- if(this.numberOfAttributes != -1) {
+
+ if (this.numberOfAttributes != -1) {
stormCollector = new OutputCollector(new StormCollector<OUT>(this.numberOfAttributes, super.collector));
}
-
+
this.bolt.prepare(null, topologyContext, stormCollector);
}
-
+
@Override
public void close() {
super.close();
this.bolt.cleanup();
}
-
+
/**
- * {@inheritDoc}
- *
* Transforms a Flink tuple into a Storm tuple and calls the bolt's {@code execute} method.
*/
@Override
protected void callUserFunction() throws Exception {
this.bolt.execute(new StormTuple<IN>(this.nextRecord.getObject()));
}
-
+
@Override
public void run() throws Exception {
- while(this.readNext() != null) {
+ while (this.readNext() != null) {
this.callUserFunctionAndLogException();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
index ed333b8..1c13e88 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
@@ -14,22 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.wrappers;
-import java.util.Collection;
-import java.util.List;
+/* we do not import
+ * --> "org.apache.flink.api.java.tuple.Tuple"
+ * or
+ * --> "backtype.storm.tuple.Tuple"
+ * to avoid confusion
+ */
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.task.IOutputCollector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.util.Collector;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.task.IOutputCollector;
-
-
-
-
+import java.util.Collection;
+import java.util.List;
/**
* A {@link StormCollector} is used by {@link AbstractStormSpoutWrapper} and {@link StormBoltWrapper} to provided an
@@ -38,119 +41,105 @@ import backtype.storm.task.IOutputCollector;
* {@link Collector}.
*/
class StormCollector<OUT> implements ISpoutOutputCollector, IOutputCollector {
- // we do not import
- // --> "org.apache.flink.api.java.tuple.Tuple"
- // or
- // --> "backtype.storm.tuple.Tuple"
- // to avoid confusion
-
- /**
- * The Flink collector.
- */
+
+ // The Flink collector
private final Collector<OUT> flinkCollector;
- /**
- * The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
- */
+ // The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}
private final org.apache.flink.api.java.tuple.Tuple outputTuple;
- /**
- * The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple}).
- */
+ // The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple})
private final int numberOfAttributes;
- /**
- * Is set to {@code true} each time a tuple is emitted.
- */
+ // Is set to {@code true} each time a tuple is emitted
boolean tupleEmitted = false;
-
-
-
+
/**
* Instantiates a new {@link StormCollector} that emits Flink tuples to the given Flink collector. If the number of
* attributes is specified as zero, any output type is supported. If the number of attributes is between 1 to 25,
* the output type is {@link Tuple1} to {@link Tuple25}.
- *
+ *
* @param numberOfAttributes
- * The number of attributes of the emitted tuples.
+ * The number of attributes of the emitted tuples.
* @param flinkCollector
- * The Flink collector to be used.
- *
+ * The Flink collector to be used.
* @throws UnsupportedOperationException
- * if the specified number of attributes is not in the valid range of [0,25]
+ * if the specified number of attributes is not in the valid range of [0,25]
*/
public StormCollector(final int numberOfAttributes, final Collector<OUT> flinkCollector)
- throws UnsupportedOperationException {
+ throws UnsupportedOperationException {
this.numberOfAttributes = numberOfAttributes;
this.flinkCollector = flinkCollector;
-
- if(this.numberOfAttributes <= 0) {
+
+ if (this.numberOfAttributes <= 0) {
this.outputTuple = null;
- } else if(this.numberOfAttributes <= 25) {
+ } else if (this.numberOfAttributes <= 25) {
try {
this.outputTuple = Tuple.getTupleClass(this.numberOfAttributes).newInstance();
- } catch(final InstantiationException e) {
+ } catch (final InstantiationException e) {
throw new RuntimeException(e);
- } catch(final IllegalAccessException e) {
+ } catch (final IllegalAccessException e) {
throw new RuntimeException(e);
}
} else {
throw new UnsupportedOperationException(
- "SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes
- + " are declared by the given bolt.");
+ "SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes
+ + " are declared by the given bolt");
}
}
-
-
-
+
@Override
public void reportError(final Throwable error) {
// not sure, if Flink can support this
- throw new UnsupportedOperationException("Not implemented yet.");
+ throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
- return this.emitImpl(streamId, null, tuple, messageId);
+ return this.emitImpl(tuple);
}
-
+
@Override
- public List<Integer> emit(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
- return this.emitImpl(streamId, anchors, tuple, null);
+ public List<Integer> emit(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors,
+ final List<Object> tuple) {
+ return this.emitImpl(tuple);
}
-
+
@SuppressWarnings("unchecked")
- public List<Integer> emitImpl(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple, final Object messageId) {
- if(this.numberOfAttributes > 0) {
+ public List<Integer> emitImpl(
+ final List<Object> tuple) {
+ if (this.numberOfAttributes > 0) {
assert (tuple.size() == this.numberOfAttributes);
- for(int i = 0; i < this.numberOfAttributes; ++i) {
+ for (int i = 0; i < this.numberOfAttributes; ++i) {
this.outputTuple.setField(tuple.get(i), i);
}
- this.flinkCollector.collect((OUT)this.outputTuple);
+ this.flinkCollector.collect((OUT) this.outputTuple);
} else {
assert (tuple.size() == 1);
- this.flinkCollector.collect((OUT)tuple.get(0));
+ this.flinkCollector.collect((OUT) tuple.get(0));
}
this.tupleEmitted = true;
-
- return null; // TODO
+
+ // TODO
+ return null;
}
-
+
@Override
public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}
-
+
@Override
- public void emitDirect(final int taskId, final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
+ public void emitDirect(final int taskId, final String streamId,
+ final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}
-
+
@Override
public void ack(final backtype.storm.tuple.Tuple input) {
- throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink.");
+ throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
}
-
+
@Override
public void fail(final backtype.storm.tuple.Tuple input) {
- throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink.");
+ throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
index f72fcee..ebbf80f 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
@@ -14,17 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.topology.IRichSpout;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
-import backtype.storm.topology.IRichSpout;
-
-
-
-
-
/**
* A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calles {@link IRichSpout#nextTuple()
* nextTuple()} for finite number of times before {@link #run(org.apache.flink.util.Collector)} returns. The number of
@@ -33,113 +29,104 @@ import backtype.storm.topology.IRichSpout;
*/
public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
private static final long serialVersionUID = 3883246587044801286L;
-
- /**
- * The number of {@link IRichSpout#nextTuple()} calls.
- */
+
+ // The number of {@link IRichSpout#nextTuple()} calls
private int numberOfInvocations;
-
-
-
+
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
* method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
* output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
* attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- *
+ * The Storm {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [1;25].
*/
public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
this(spout, false, -1);
}
-
+
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one
+ * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be
+ * one
* of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
- *
+ * The number of calls to {@link IRichSpout#nextTuple()}.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [1;25].
*/
public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
- throws IllegalArgumentException {
+ throws IllegalArgumentException {
this(spout, false, numberOfInvocations);
}
-
+
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
* method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
* output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
* output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
* {@link Tuple25} depending on the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- *
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
this(spout, rawOutput, -1);
}
-
+
/**
* Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
* method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
* type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
* {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on
* the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
- *
+ * The number of calls to {@link IRichSpout#nextTuple()}.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput, final int numberOfInvocations)
- throws IllegalArgumentException {
+ throws IllegalArgumentException {
super(spout, rawOutput);
this.numberOfInvocations = numberOfInvocations;
}
-
-
-
+
/**
* Calls {@link IRichSpout#nextTuple()} for the given number of times.
*/
@Override
protected void execute() {
- if(this.numberOfInvocations >= 0) {
- while((--this.numberOfInvocations >= 0) && super.isRunning) {
+ if (this.numberOfInvocations >= 0) {
+ while ((--this.numberOfInvocations >= 0) && super.isRunning) {
super.spout.nextTuple();
}
} else {
do {
super.collector.tupleEmitted = false;
super.spout.nextTuple();
- } while(super.collector.tupleEmitted && super.isRunning);
+ } while (super.collector.tupleEmitted && super.isRunning);
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
index a9c66ee..6005d6d 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
@@ -21,61 +21,54 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
-
-
-
-
/**
* {@link StormOutputFieldsDeclarer} is used by {@link StormBoltWrapper} to determine the number of attributes declared
* by the wrapped bolt's {@code declare(...)} method.
*/
class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
- /**
- * The output schema declared by the wrapped bolt.
- */
+
+ // The output schema declared by the wrapped bolt.
private Fields outputSchema = null;
-
-
-
+
@Override
public void declare(final Fields fields) {
this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
}
-
+
@Override
public void declare(final boolean direct, final Fields fields) {
this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
}
-
+
@Override
public void declareStream(final String streamId, final Fields fields) {
this.declareStream(streamId, false, fields);
}
-
+
@Override
public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
- throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink.");
+ if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+ throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
}
- if(direct) {
+ if (direct) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}
-
+
this.outputSchema = fields;
}
-
+
/**
* Returns the number of attributes of the output schema declare by the wrapped bolt. If no output schema is
* declared (eg, for sink bolts), {@code -1} is returned.
- *
+ *
* @return the number of attributes of the output schema declare by the wrapped bolt
*/
public int getNumberOfAttributes() {
- if(this.outputSchema != null) {
+ if (this.outputSchema != null) {
return this.outputSchema.size();
}
-
+
return -1;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
index 0c7d62d..f5e0733 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
@@ -14,70 +14,65 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.topology.IRichSpout;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
-import backtype.storm.topology.IRichSpout;
-
-
-
-
-
/**
* A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's
* {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop.
*/
public class StormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
private static final long serialVersionUID = -218340336648247605L;
-
+
/**
- * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+ * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
+ * can
* be used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
* depending on the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- *
+ * The Storm {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [1;25].
*/
public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
super(spout, false);
}
-
+
/**
- * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+ * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
+ * can
* be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
* output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
* attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- *
+ * The Storm {@link IRichSpout spout} to be used.
* @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- *
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
public StormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
super(spout, rawOutput);
}
-
+
/**
* Calls {@link IRichSpout#nextTuple()} in an infinite loop until {@link #cancel()} is called.
*/
@Override
protected void execute() {
- while(super.isRunning) {
+ while (super.isRunning) {
super.spout.nextTuple();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
index 21f8de4..8019d7d 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
@@ -18,209 +18,202 @@
package org.apache.flink.stormcompatibility.wrappers;
-import java.util.List;
-
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-
-
-
+import java.util.List;
/**
* {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
*/
class StormTuple<IN> implements Tuple {
- /**
- * The storm representation of the original Flink tuple.
- */
+
+ // The storm representation of the original Flink tuple
private final Values stormTuple;
-
-
-
+
/**
* Create a new Storm tuple from the given Flink tuple.
- *
+ *
* @param flinkTuple
- * The Flink tuple to be converted.
+ * The Flink tuple to be converted.
*/
public StormTuple(final IN flinkTuple) {
- if(flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
- final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple)flinkTuple;
-
+ if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
+ final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
+
final int numberOfAttributes = t.getArity();
this.stormTuple = new Values();
- for(int i = 0; i < numberOfAttributes; ++i) {
+ for (int i = 0; i < numberOfAttributes; ++i) {
this.stormTuple.add(t.getField(i));
}
} else {
this.stormTuple = new Values(flinkTuple);
}
}
-
+
@Override
public int size() {
return this.stormTuple.size();
}
-
+
@Override
public boolean contains(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Fields getFields() {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public int fieldIndex(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public List<Object> select(final Fields selector) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Object getValue(final int i) {
return this.stormTuple.get(i);
}
-
+
@Override
public String getString(final int i) {
- return (String)this.stormTuple.get(i);
+ return (String) this.stormTuple.get(i);
}
-
+
@Override
public Integer getInteger(final int i) {
- return (Integer)this.stormTuple.get(i);
+ return (Integer) this.stormTuple.get(i);
}
-
+
@Override
public Long getLong(final int i) {
- return (Long)this.stormTuple.get(i);
+ return (Long) this.stormTuple.get(i);
}
-
+
@Override
public Boolean getBoolean(final int i) {
- return (Boolean)this.stormTuple.get(i);
+ return (Boolean) this.stormTuple.get(i);
}
-
+
@Override
public Short getShort(final int i) {
- return (Short)this.stormTuple.get(i);
+ return (Short) this.stormTuple.get(i);
}
-
+
@Override
public Byte getByte(final int i) {
- return (Byte)this.stormTuple.get(i);
+ return (Byte) this.stormTuple.get(i);
}
-
+
@Override
public Double getDouble(final int i) {
- return (Double)this.stormTuple.get(i);
+ return (Double) this.stormTuple.get(i);
}
-
+
@Override
public Float getFloat(final int i) {
- return (Float)this.stormTuple.get(i);
+ return (Float) this.stormTuple.get(i);
}
-
+
@Override
public byte[] getBinary(final int i) {
- return (byte[])this.stormTuple.get(i);
+ return (byte[]) this.stormTuple.get(i);
}
-
+
@Override
public Object getValueByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public String getStringByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Integer getIntegerByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Long getLongByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Boolean getBooleanByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Short getShortByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Byte getByteByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Double getDoubleByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public Float getFloatByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public byte[] getBinaryByField(final String field) {
throw new UnsupportedOperationException("Not implemented yet");
}
-
+
@Override
public List<Object> getValues() {
return this.stormTuple;
}
-
+
@Override
public GlobalStreamId getSourceGlobalStreamid() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
}
-
+
@Override
public String getSourceComponent() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
}
-
+
@Override
public int getSourceTask() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
}
-
+
@Override
public String getSourceStreamId() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
}
-
+
@Override
public MessageId getMessageId() {
// not sure if Flink can support this
throw new UnsupportedOperationException();
}
-
+
@Override
public int hashCode() {
final int prime = 31;
@@ -228,27 +221,27 @@ class StormTuple<IN> implements Tuple {
result = (prime * result) + ((this.stormTuple == null) ? 0 : this.stormTuple.hashCode());
return result;
}
-
+
@Override
public boolean equals(final Object obj) {
- if(this == obj) {
+ if (this == obj) {
return true;
}
- if(obj == null) {
+ if (obj == null) {
return false;
}
- if(this.getClass() != obj.getClass()) {
+ if (this.getClass() != obj.getClass()) {
return false;
}
- final StormTuple<?> other = (StormTuple<?>)obj;
- if(this.stormTuple == null) {
- if(other.stormTuple != null) {
+ final StormTuple<?> other = (StormTuple<?>) obj;
+ if (this.stormTuple == null) {
+ if (other.stormTuple != null) {
return false;
}
- } else if(!this.stormTuple.equals(other.stormTuple)) {
+ } else if (!this.stormTuple.equals(other.stormTuple)) {
return false;
}
return true;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index ecbbe5c..a8f99e6 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -14,15 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.wrappers;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
@@ -32,84 +25,90 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-
-
+import java.util.HashMap;
+import java.util.Map;
/**
* {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or
* {@link StormBoltWrapper}.
*/
class StormWrapperSetupHelper {
-
+
/**
- * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}.
- * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link Tuple25}
+ * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link
+ * StormBoltWrapper}.
+ * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link
+ * Tuple25}
* . In case of a data sink, {@code -1} is returned. .
- *
+ *
* @param spoutOrBolt
- * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
+ * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
* @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @return The number of attributes to be used.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput)
- throws IllegalArgumentException {
+ throws IllegalArgumentException {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
spoutOrBolt.declareOutputFields(declarer);
-
+
final int declaredNumberOfAttributes = declarer.getNumberOfAttributes();
-
- if(declaredNumberOfAttributes == -1) {
+
+ if (declaredNumberOfAttributes == -1) {
return -1;
}
-
- if((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
+
+ if ((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
throw new IllegalArgumentException(
- "Provided bolt declares non supported number of output attributes. Must be in range [1;25] but was "
- + declaredNumberOfAttributes);
+ "Provided bolt declares non supported number of output attributes. Must be in range [1;25] but " +
+ "was "
+ + declaredNumberOfAttributes);
}
-
- if(rawOutput) {
- if(declaredNumberOfAttributes > 1) {
+
+ if (rawOutput) {
+ if (declaredNumberOfAttributes > 1) {
throw new IllegalArgumentException(
- "Ouput type is requested to be raw type, but provided bolt declares more then one output attribute.");
-
+ "Ouput type is requested to be raw type, but provided bolt declares more then one output " +
+ "attribute.");
+
}
return 0;
}
-
+
return declaredNumberOfAttributes;
}
-
- /**
- * TODO
- */
- public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context, final boolean spoutOrBolt) {
- final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask());
-
+
+ // TODO
+ public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context,
+ final boolean spoutOrBolt) {
+ final Integer taskId = 1 + context.getIndexOfThisSubtask();
+
final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
taskToComponents.put(taskId, context.getTaskName());
-
+
final ComponentCommon common = new ComponentCommon();
common.set_parallelism_hint(context.getNumberOfParallelSubtasks());
-
+
final Map<String, Bolt> bolts = new HashMap<String, Bolt>();
final Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
-
- if(spoutOrBolt) {
+
+ if (spoutOrBolt) {
spoutSpecs.put(context.getTaskName(), new SpoutSpec(null, common));
} else {
bolts.put(context.getTaskName(), new Bolt(null, common));
}
-
+
return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
index 1902835..b55e1ef 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
@@ -14,164 +14,159 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-import java.util.LinkedList;
+package org.apache.flink.stormcompatibility.api;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-
-
-
+import java.util.LinkedList;
public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-
+
@Test
public void testDeclare() {
- for(int i = 0; i < 4; ++i) {
- for(int j = 0; j <= 25; ++j) {
+ for (int i = 0; i < 4; ++i) {
+ for (int j = 0; j <= 25; ++j) {
this.runDeclareTest(i, j);
}
}
}
-
+
@Test(expected = IllegalArgumentException.class)
public void testDeclareSimpleToManyAttributes() {
this.runDeclareTest(0, 26);
}
-
+
@Test(expected = IllegalArgumentException.class)
public void testDeclareNonDirectToManyAttributes() {
this.runDeclareTest(1, 26);
}
-
+
@Test(expected = IllegalArgumentException.class)
public void testDeclareDefaultStreamToManyAttributes() {
this.runDeclareTest(2, 26);
}
-
+
@Test(expected = IllegalArgumentException.class)
public void testDeclareFullToManyAttributes() {
this.runDeclareTest(3, 26);
}
-
+
private void runDeclareTest(final int testCase, final int numberOfAttributes) {
final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
-
+
final String[] attributes = new String[numberOfAttributes];
- for(int i = 0; i < numberOfAttributes; ++i) {
+ for (int i = 0; i < numberOfAttributes; ++i) {
attributes[i] = "a" + i;
}
-
- switch(testCase) {
- case 0:
- this.declareSimple(declarere, attributes);
- break;
- case 1:
- this.declareNonDirect(declarere, attributes);
- break;
- case 2:
- this.declareDefaultStream(declarere, attributes);
- break;
- default:
- this.declareFull(declarere, attributes);
+
+ switch (testCase) {
+ case 0:
+ this.declareSimple(declarere, attributes);
+ break;
+ case 1:
+ this.declareNonDirect(declarere, attributes);
+ break;
+ case 2:
+ this.declareDefaultStream(declarere, attributes);
+ break;
+ default:
+ this.declareFull(declarere, attributes);
}
-
-
+
final TypeInformation<?> type = declarere.getOutputType();
-
- if(numberOfAttributes == 0) {
+
+ if (numberOfAttributes == 0) {
Assert.assertNull(type);
} else {
Assert.assertEquals(numberOfAttributes, type.getArity());
- if(numberOfAttributes == 1) {
+ if (numberOfAttributes == 1) {
Assert.assertFalse(type.isTupleType());
} else {
Assert.assertTrue(type.isTupleType());
}
}
}
-
+
private void declareSimple(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declare(new Fields(attributes));
}
-
+
private void declareNonDirect(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declare(false, new Fields(attributes));
}
-
+
private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
}
-
+
private void declareFull(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareDirect() {
new FlinkOutputFieldsDeclarer().declare(true, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareNonDefaultStrem() {
new FlinkOutputFieldsDeclarer().declareStream("dummy", null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareDirect2() {
new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareNonDefaultStrem2() {
new FlinkOutputFieldsDeclarer().declareStream("dummy", this.r.nextBoolean(), null);
}
-
+
@Test
public void testGetGroupingFieldIndexes() {
final int numberOfAttributes = 5 + this.r.nextInt(21);
final String[] attributes = new String[numberOfAttributes];
- for(int i = 0; i < numberOfAttributes; ++i) {
+ for (int i = 0; i < numberOfAttributes; ++i) {
attributes[i] = "a" + i;
}
-
+
final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
declarere.declare(new Fields(attributes));
-
+
final int numberOfKeys = 1 + this.r.nextInt(25);
final LinkedList<String> groupingFields = new LinkedList<String>();
final boolean[] indexes = new boolean[numberOfAttributes];
-
- for(int i = 0; i < numberOfAttributes; ++i) {
- if(this.r.nextInt(26) < numberOfKeys) {
+
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ if (this.r.nextInt(26) < numberOfKeys) {
groupingFields.add(attributes[i]);
indexes[i] = true;
} else {
indexes[i] = false;
}
}
-
+
final int[] expectedResult = new int[groupingFields.size()];
int j = 0;
- for(int i = 0; i < numberOfAttributes; ++i) {
- if(indexes[i]) {
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ if (indexes[i]) {
expectedResult[j++] = i;
}
}
-
+
final int[] result = declarere.getGroupingFieldIndexes(groupingFields);
-
+
Assert.assertEquals(expectedResult.length, result.length);
- for(int i = 0; i < expectedResult.length; ++i) {
+ for (int i = 0; i < expectedResult.length; ++i) {
Assert.assertEquals(expectedResult[i], result[i]);
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
index 029a400..d214610 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
@@ -14,65 +14,61 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-import org.junit.Test;
+package org.apache.flink.stormcompatibility.api;
import backtype.storm.metric.api.ICombiner;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.IReducer;
-
-
-
-
+import org.junit.Test;
public class FlinkTopologyContextTest {
-
+
@Test(expected = UnsupportedOperationException.class)
public void testAddTaskHook() {
new FlinkTopologyContext(null, null, null).addTaskHook(null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testGetHooks() {
new FlinkTopologyContext(null, null, null).getHooks();
}
-
+
@SuppressWarnings("rawtypes")
@Test(expected = UnsupportedOperationException.class)
public void testRegisteredMetric1() {
- new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner)null, 0);
+ new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner) null, 0);
}
-
+
@SuppressWarnings("rawtypes")
@Test(expected = UnsupportedOperationException.class)
public void testRegisteredMetric2() {
- new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer)null, 0);
+ new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer) null, 0);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testRegisteredMetric3() {
- new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric)null, 0);
+ new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric) null, 0);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testGetRegisteredMetricByName() {
new FlinkTopologyContext(null, null, null).getRegisteredMetricByName(null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testSetAllSubscribedState() {
new FlinkTopologyContext(null, null, null).setAllSubscribedState(null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testSetSubscribedState1() {
new FlinkTopologyContext(null, null, null).setSubscribedState(null, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testSetSubscribedState2() {
new FlinkTopologyContext(null, null, null).setSubscribedState(null, null, null);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
index 4f1f40f..f179919 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
@@ -14,52 +14,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.api;
import org.junit.Assert;
import org.junit.Test;
-
-
-
-
public class FlinkTopologyTest {
-
+
@Test
public void testDefaultParallelism() {
final FlinkTopology topology = new FlinkTopology(null);
Assert.assertEquals(1, topology.getParallelism());
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testExecute() throws Exception {
new FlinkTopology(null).execute();
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testExecuteWithName() throws Exception {
new FlinkTopology(null).execute(null);
}
-
+
@Test
public void testNumberOfTasks() {
final FlinkTopology topology = new FlinkTopology(null);
-
+
Assert.assertEquals(0, topology.getNumberOfTasks());
-
+
topology.increaseNumberOfTasks(3);
Assert.assertEquals(3, topology.getNumberOfTasks());
-
+
topology.increaseNumberOfTasks(2);
Assert.assertEquals(5, topology.getNumberOfTasks());
-
+
topology.increaseNumberOfTasks(8);
Assert.assertEquals(13, topology.getNumberOfTasks());
}
-
+
@Test(expected = AssertionError.class)
public void testAssert() {
new FlinkTopology(null).increaseNumberOfTasks(0);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
index 100ca87..94a50cf 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
@@ -14,29 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.util;
-import java.util.Random;
+package org.apache.flink.stormcompatibility.util;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-
-
+import java.util.Random;
public abstract class AbstractTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
-
+
protected long seed;
protected Random r;
-
+
@Before
public void prepare() {
this.seed = System.currentTimeMillis();
this.r = new Random(this.seed);
LOG.info("Test seed: {}", new Long(this.seed));
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
index 52136a9..4182ad0 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
@@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.wrappers;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
@@ -25,59 +24,55 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-
-
-
+import java.util.Map;
class FiniteTestSpout implements IRichSpout {
private static final long serialVersionUID = 7992419478267824279L;
-
+
private int numberOfOutputTuples;
private SpoutOutputCollector collector;
-
-
-
+
public FiniteTestSpout(final int numberOfOutputTuples) {
this.numberOfOutputTuples = numberOfOutputTuples;
}
-
-
-
+
+ @SuppressWarnings("rawtypes")
@Override
- public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context, @SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+ public void open(final Map conf, final TopologyContext context,
+ final SpoutOutputCollector collector) {
this.collector = collector;
}
-
+
@Override
public void close() {/* nothing to do */}
-
+
@Override
public void activate() {/* nothing to do */}
-
+
@Override
public void deactivate() {/* nothing to do */}
-
+
@Override
public void nextTuple() {
- if(--this.numberOfOutputTuples >= 0) {
- this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+ if (--this.numberOfOutputTuples >= 0) {
+ this.collector.emit(new Values(this.numberOfOutputTuples));
}
}
-
+
@Override
public void ack(final Object msgId) {/* nothing to do */}
-
+
@Override
public void fail(final Object msgId) {/* nothing to do */}
-
+
@Override
public void declareOutputFields(final OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("dummy"));
}
-
+
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
index 7ecc98c..2c2a221 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
@@ -14,28 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.wrappers;
-import static org.mockito.Mockito.mock;
+package org.apache.flink.stormcompatibility.wrappers;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.junit.Assert;
import org.junit.Test;
-
-
-
+import static org.mockito.Mockito.mock;
public class FlinkDummyRichFunctionTest {
-
+
@Test
public void testRuntimeContext() {
final FlinkDummyRichFunction dummy = new FlinkDummyRichFunction();
-
+
final RuntimeContext context = mock(RuntimeContext.class);
dummy.setRuntimeContext(context);
-
+
Assert.assertSame(context, dummy.getRuntimeContext());
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 50ae763..f2cfe59 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -17,16 +17,10 @@
package org.apache.flink.stormcompatibility.wrappers;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
@@ -41,177 +35,168 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-
-
-
+import java.util.Map;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
public class StormBoltWrapperTest {
-
- @SuppressWarnings("unused")
+
@Test(expected = IllegalArgumentException.class)
public void testWrapperRawType() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy1", "dummy2"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
}
-
- @SuppressWarnings("unused")
+
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes1() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
final String[] schema = new String[26];
- for(int i = 0; i < schema.length; ++i) {
+ for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
}
-
- @SuppressWarnings("unused")
+
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes2() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
final String[] schema = new String[26];
- for(int i = 0; i < schema.length; ++i) {
+ for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
}
-
+
@Test
public void testWrapper() throws Exception {
- for(int i = 0; i < 26; ++i) {
+ for (int i = 0; i < 26; ++i) {
this.testWrapper(i);
}
}
-
+
@SuppressWarnings({"rawtypes", "unchecked"})
private void testWrapper(final int numberOfAttributes) throws Exception {
assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
-
+
Tuple flinkTuple = null;
String rawTuple = null;
-
- if(numberOfAttributes == 0) {
- rawTuple = new String("test");
+
+ if (numberOfAttributes == 0) {
+ rawTuple = "test";
} else {
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
}
-
+
String[] schema = new String[numberOfAttributes];
- if(numberOfAttributes == 0) {
+ if (numberOfAttributes == 0) {
schema = new String[1];
}
- for(int i = 0; i < schema.length; ++i) {
+ for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
-
-
+
final StreamRecord record = mock(StreamRecord.class);
- if(numberOfAttributes == 0) {
+ if (numberOfAttributes == 0) {
when(record.getObject()).thenReturn(rawTuple);
} else {
when(record.getObject()).thenReturn(flinkTuple);
}
-
+
final StreamRecordSerializer serializer = mock(StreamRecordSerializer.class);
when(serializer.createInstance()).thenReturn(record);
-
+
final IndexedReaderIterator reader = mock(IndexedReaderIterator.class);
when(reader.next(record)).thenReturn(record).thenReturn(null);
-
+
final StreamTaskContext taskContext = mock(StreamTaskContext.class);
when(taskContext.getInputSerializer(0)).thenReturn(serializer);
when(taskContext.getIndexedInput(0)).thenReturn(reader);
-
-
-
+
final IRichBolt bolt = mock(IRichBolt.class);
-
+
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields(schema));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
wrapper.setup(taskContext);
-
-
-
+
wrapper.callUserFunction();
- if(numberOfAttributes == 0) {
+ if (numberOfAttributes == 0) {
verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
} else {
verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
}
-
-
-
+
wrapper.run();
- if(numberOfAttributes == 0) {
+ if (numberOfAttributes == 0) {
verify(bolt, times(2)).execute(eq(new StormTuple<String>(rawTuple)));
} else {
verify(bolt, times(2)).execute(eq(new StormTuple<Tuple>(flinkTuple)));
}
}
-
+
@Test
public void testOpen() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
-
+
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-
+
wrapper.open(mock(Configuration.class));
-
+
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
}
-
+
@Test
public void testOpenSink() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-
+
wrapper.open(mock(Configuration.class));
-
+
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
}
-
+
@SuppressWarnings("unchecked")
@Test
public void testClose() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
-
+
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-
+
final StreamTaskContext<Object> taskContext = mock(StreamTaskContext.class);
when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
wrapper.setup(taskContext);
-
+
wrapper.close();
verify(bolt).cleanup();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
index 660fcea..925da04 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
@@ -17,109 +17,104 @@
package org.apache.flink.stormcompatibility.wrappers;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.util.Collection;
-import java.util.List;
-
+import backtype.storm.tuple.Values;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-import backtype.storm.tuple.Values;
-
-
-
+import java.util.Collection;
+import java.util.List;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class StormCollectorTest extends AbstractTest {
-
+
@Test
public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
- for(int i = 0; i < 26; ++i) {
+ for (int i = 0; i < 26; ++i) {
this.testStromCollector(true, i);
}
}
-
+
@Test
public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
- for(int i = 0; i < 26; ++i) {
+ for (int i = 0; i < 26; ++i) {
this.testStromCollector(false, i);
}
}
-
- @SuppressWarnings({"unchecked", "rawtypes"})
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
private void testStromCollector(final boolean spoutTest, final int numberOfAttributes)
- throws InstantiationException, IllegalAccessException {
+ throws InstantiationException, IllegalAccessException {
assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
-
+
final Collector flinkCollector = mock(Collector.class);
Tuple flinkTuple = null;
final Values tuple = new Values();
-
- StormCollector<?> collector = null;
-
- if(numberOfAttributes == 0) {
+
+ StormCollector<?> collector;
+
+ if (numberOfAttributes == 0) {
collector = new StormCollector(numberOfAttributes, flinkCollector);
- tuple.add(new Integer(this.r.nextInt()));
-
+ tuple.add(this.r.nextInt());
+
} else {
collector = new StormCollector(numberOfAttributes, flinkCollector);
flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-
- for(int i = 0; i < numberOfAttributes; ++i) {
- tuple.add(new Integer(this.r.nextInt()));
+
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ tuple.add(this.r.nextInt());
flinkTuple.setField(tuple.get(i), i);
}
}
-
+
final String streamId = "streamId";
final Collection anchors = mock(Collection.class);
final List<Integer> taskIds;
- final Object messageId = new Integer(this.r.nextInt());
- if(spoutTest) {
+ final Object messageId = this.r.nextInt();
+ if (spoutTest) {
taskIds = collector.emit(streamId, tuple, messageId);
} else {
taskIds = collector.emit(streamId, anchors, tuple);
}
-
+
Assert.assertNull(taskIds);
-
- if(numberOfAttributes == 0) {
+
+ if (numberOfAttributes == 0) {
verify(flinkCollector).collect(tuple.get(0));
} else {
verify(flinkCollector).collect(flinkTuple);
}
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testReportError() {
new StormCollector<Object>(1, null).reportError(null);
}
-
+
@SuppressWarnings({"rawtypes", "unchecked"})
@Test(expected = UnsupportedOperationException.class)
public void testBoltEmitDirect() {
- new StormCollector<Object>(1, null).emitDirect(0, (String)null, (Collection)null, (List)null);
+ new StormCollector<Object>(1, null).emitDirect(0, null, (Collection) null, null);
}
-
- @SuppressWarnings({"rawtypes", "unchecked"})
+
+ @SuppressWarnings("unchecked")
@Test(expected = UnsupportedOperationException.class)
public void testSpoutEmitDirect() {
- new StormCollector<Object>(1, null).emitDirect(0, (String)null, (List)null, (Object)null);
+ new StormCollector<Object>(1, null).emitDirect(0, null, null, (Object) null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testAck() {
new StormCollector<Object>(1, null).ack(null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testFail() {
new StormCollector<Object>(1, null).fail(null);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index 3c4f5aa..0c5b124 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -14,14 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.wrappers;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import java.util.LinkedList;
+package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -32,80 +29,79 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-
-
-
+import java.util.LinkedList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormFiniteSpoutWrapperTest extends AbstractTest {
-
+
@Test
public void testRunExecuteFixedNumber() throws Exception {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
+
final IRichSpout spout = mock(IRichSpout.class);
final int numberOfCalls = this.r.nextInt(50);
final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-
+
spoutWrapper.run(null);
verify(spout, times(numberOfCalls)).nextTuple();
}
-
+
@Test
public void testRunExecute() throws Exception {
final int numberOfCalls = this.r.nextInt(50);
-
+
final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
- for(int i = numberOfCalls - 1; i >= 0; --i) {
- expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+ for (int i = numberOfCalls - 1; i >= 0; --i) {
+ expectedResult.add(new Tuple1<Integer>(i));
}
-
+
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
- spout);
+ spout);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-
+
final TestCollector collector = new TestCollector();
spoutWrapper.run(collector);
-
+
Assert.assertEquals(expectedResult, collector.result);
}
-
+
@Test
public void testCancel() throws Exception {
final int numberOfCalls = 5 + this.r.nextInt(5);
-
+
final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
- expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
-
+ expectedResult.add(new Tuple1<Integer>(numberOfCalls - 1));
+
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
- spout);
+ spout);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-
+
spoutWrapper.cancel();
final TestCollector collector = new TestCollector();
spoutWrapper.run(collector);
-
+
Assert.assertEquals(expectedResult, collector.result);
}
-
+
@Test
public void testClose() throws Exception {
final IRichSpout spout = mock(IRichSpout.class);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
- spout);
-
+ spout);
+
spoutWrapper.close();
-
+
verify(spout).close();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
index 5a4c05b..a72eb19 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -17,66 +17,64 @@
package org.apache.flink.stormcompatibility.wrappers;
-import java.util.ArrayList;
-
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-
-
-
+import java.util.ArrayList;
public class StormOutputFieldsDeclarerTest extends AbstractTest {
-
+
@Test
public void testDeclare() {
final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
-
+
Assert.assertEquals(-1, declarer.getNumberOfAttributes());
-
+
final int numberOfAttributes = 1 + this.r.nextInt(25);
final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
- for(int i = 0; i < numberOfAttributes; ++i) {
+ for (int i = 0; i < numberOfAttributes; ++i) {
schema.add("a" + i);
}
declarer.declare(new Fields(schema));
Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
}
-
+
+ @SuppressWarnings("unused")
public void testDeclareDirect() {
new StormOutputFieldsDeclarer().declare(false, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareDirectFail() {
new StormOutputFieldsDeclarer().declare(true, null);
}
-
+
+ @SuppressWarnings("unused")
public void testDeclareStream() {
new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareStreamFail() {
new StormOutputFieldsDeclarer().declareStream(null, null);
}
-
+
+ @SuppressWarnings("unused")
public void testDeclareFullStream() {
new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareFullStreamFailNonDefaultStream() {
new StormOutputFieldsDeclarer().declareStream(null, false, null);
}
-
+
@Test(expected = UnsupportedOperationException.class)
public void testDeclareFullStreamFailDirect() {
new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 88d855c..48f680a 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -14,13 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.wrappers;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import java.util.LinkedList;
+package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.topology.IRichSpout;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -30,39 +27,38 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import backtype.storm.topology.IRichSpout;
-
-
-
+import java.util.LinkedList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormSpoutWrapperTest extends AbstractTest {
-
+
@Test
public void testRunExecuteCancelInfinite() throws Exception {
final int numberOfCalls = 5 + this.r.nextInt(5);
-
+
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-
+
spoutWrapper.cancel();
final TestCollector collector = new TestCollector();
spoutWrapper.run(collector);
-
+
Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
}
-
+
@Test
public void testClose() throws Exception {
final IRichSpout spout = mock(IRichSpout.class);
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
-
+
spoutWrapper.close();
-
+
verify(spout).close();
}
-
+
}