You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:27 UTC
[02/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
new file mode 100644
index 0000000..f51aba4
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public abstract class AbstractTest {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
+
+ protected long seed;
+ protected Random r;
+
+ @Before
+ public void prepare() {
+ this.seed = System.currentTimeMillis();
+ this.r = new Random(this.seed);
+ LOG.info("Test seed: {}", new Long(this.seed));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
new file mode 100644
index 0000000..1b320e5
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FiniteTestSpout implements IRichSpout {
+ private static final long serialVersionUID = 7992419478267824279L;
+
+ private int numberOfOutputTuples;
+ private SpoutOutputCollector collector;
+
+ public FiniteTestSpout(final int numberOfOutputTuples) {
+ this.numberOfOutputTuples = numberOfOutputTuples;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void close() {/* nothing to do */}
+
+ @Override
+ public void activate() {/* nothing to do */}
+
+ @Override
+ public void deactivate() {/* nothing to do */}
+
+ @Override
+ public void nextTuple() {
+ if (--this.numberOfOutputTuples >= 0) {
+ this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+ }
+ }
+
+ @Override
+ public void ack(final Object msgId) {/* nothing to do */}
+
+ @Override
+ public void fail(final Object msgId) {/* nothing to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("dummy"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
new file mode 100644
index 0000000..17de427
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StormStreamSelectorTest {
+
+ @Test
+ public void testSelector() {
+ StormStreamSelector<Object> selector = new StormStreamSelector<Object>();
+ SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+ Iterator<String> result;
+
+ tuple.streamId = "stream1";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream1", result.next());
+ Assert.assertFalse(result.hasNext());
+
+ tuple.streamId = "stream2";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream2", result.next());
+ Assert.assertFalse(result.hasNext());
+
+ tuple.streamId = "stream1";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream1", result.next());
+ Assert.assertFalse(result.hasNext());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
new file mode 100644
index 0000000..b7458df
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestDummyBolt implements IRichBolt {
+ private static final long serialVersionUID = 6893611247443121322L;
+
+ public final static String shuffleStreamId = "shuffleStream";
+ public final static String groupingStreamId = "groupingStream";
+
+ private boolean emit = true;
+ @SuppressWarnings("rawtypes")
+ public Map config;
+ private TopologyContext context;
+ private OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.config = stormConf;
+ this.context = context;
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (this.context.getThisTaskIndex() == 0) {
+ this.collector.emit(shuffleStreamId, input.getValues());
+ }
+ if (this.emit) {
+ this.collector.emit(groupingStreamId, new Values("bolt", this.context));
+ this.emit = false;
+ }
+ }
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(shuffleStreamId, new Fields("data"));
+ declarer.declareStream(groupingStreamId, new Fields("id", "data"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
new file mode 100644
index 0000000..ed9ffff
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+public class TestDummySpout implements IRichSpout {
+ private static final long serialVersionUID = -5190945609124603118L;
+
+ public final static String spoutStreamId = "spout-stream";
+
+ private boolean emit = true;
+ @SuppressWarnings("rawtypes")
+ public Map config;
+ private TopologyContext context;
+ private SpoutOutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.config = conf;
+ this.context = context;
+ this.collector = collector;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void activate() {}
+
+ @Override
+ public void deactivate() {}
+
+ @Override
+ public void nextTuple() {
+ if (this.emit) {
+ this.collector.emit(new Values(this.context));
+ this.emit = false;
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {}
+
+ @Override
+ public void fail(Object msgId) {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data"));
+ declarer.declareStream(spoutStreamId, new Fields("id", "data"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
new file mode 100644
index 0000000..59939fd
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestSink implements IRichBolt {
+ private static final long serialVersionUID = 4314871456719370877L;
+
+ public final static List<TopologyContext> result = new LinkedList<TopologyContext>();
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ result.add(context);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (input.size() == 1) {
+ result.add((TopologyContext) input.getValue(0));
+ } else {
+ result.add((TopologyContext) input.getValue(1));
+ }
+ }
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
new file mode 100644
index 0000000..3d7d26b
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.BoltCollector;
+import org.apache.flink.streaming.api.operators.Output;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class BoltCollectorTest extends AbstractTest {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
+ for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
+ final Output flinkCollector = mock(Output.class);
+ Tuple flinkTuple = null;
+ final Values tuple = new Values();
+
+ BoltCollector<?> collector;
+
+ final String streamId = "streamId";
+ HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+ attributes.put(streamId, numberOfAttributes);
+
+ if (numberOfAttributes == -1) {
+ collector = new BoltCollector(attributes, flinkCollector);
+ tuple.add(new Integer(this.r.nextInt()));
+
+ } else {
+ collector = new BoltCollector(attributes, flinkCollector);
+ flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ tuple.add(new Integer(this.r.nextInt()));
+ flinkTuple.setField(tuple.get(i), i);
+ }
+ }
+
+ final Collection anchors = mock(Collection.class);
+ final List<Integer> taskIds;
+ taskIds = collector.emit(streamId, anchors, tuple);
+
+ Assert.assertNull(taskIds);
+
+ if (numberOfAttributes == -1) {
+ verify(flinkCollector).collect(tuple.get(0));
+ } else {
+ verify(flinkCollector).collect(flinkTuple);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = UnsupportedOperationException.class)
+ public void testEmitDirect() {
+ new BoltCollector<Object>(mock(HashMap.class), mock(Output.class)).emitDirect(0, null,
+ null, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
new file mode 100644
index 0000000..e33fdb9
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.StormTuple;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class})
+public class BoltWrapperTest extends AbstractTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrapperRawType() throws Exception {
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ declarer.declare(new Fields("dummy1", "dummy2"));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ new BoltWrapper<Object, Object>(mock(IRichBolt.class),
+ new String[] { Utils.DEFAULT_STREAM_ID });
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrapperToManyAttributes1() throws Exception {
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ final String[] schema = new String[26];
+ for (int i = 0; i < schema.length; ++i) {
+ schema[i] = "a" + i;
+ }
+ declarer.declare(new Fields(schema));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ new BoltWrapper<Object, Object>(mock(IRichBolt.class));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrapperToManyAttributes2() throws Exception {
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ final String[] schema = new String[26];
+ for (int i = 0; i < schema.length; ++i) {
+ schema[i] = "a" + i;
+ }
+ declarer.declare(new Fields(schema));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ new BoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
+ }
+
+ @Test
+ public void testWrapper() throws Exception {
+ for (int i = -1; i < 26; ++i) {
+ this.testWrapper(i);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void testWrapper(final int numberOfAttributes) throws Exception {
+ assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25));
+ Tuple flinkTuple = null;
+ String rawTuple = null;
+
+ if (numberOfAttributes == -1) {
+ rawTuple = "test";
+ } else {
+ flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+ }
+
+ final String[] schema;
+ if (numberOfAttributes == -1) {
+ schema = new String[1];
+ } else {
+ schema = new String[numberOfAttributes];
+ }
+ for (int i = 0; i < schema.length; ++i) {
+ schema[i] = "a" + i;
+ }
+
+ final StreamRecord record = mock(StreamRecord.class);
+ if (numberOfAttributes == -1) {
+ when(record.getValue()).thenReturn(rawTuple);
+ } else {
+ when(record.getValue()).thenReturn(flinkTuple);
+ }
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final IRichBolt bolt = mock(IRichBolt.class);
+
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ declarer.declare(new Fields(schema));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
+ wrapper.setup(mock(Output.class), taskContext);
+ wrapper.open(null);
+
+ wrapper.processElement(record);
+ if (numberOfAttributes == -1) {
+ verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
+ } else {
+ verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testMultipleOutputStreams() throws Exception {
+ final boolean rawOutType1 = super.r.nextBoolean();
+ final boolean rawOutType2 = super.r.nextBoolean();
+
+ final StreamRecord record = mock(StreamRecord.class);
+ when(record.getValue()).thenReturn(2).thenReturn(3);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final Output output = mock(Output.class);
+
+ final TestBolt bolt = new TestBolt();
+ final HashSet<String> raw = new HashSet<String>();
+ if (rawOutType1) {
+ raw.add("stream1");
+ }
+ if (rawOutType2) {
+ raw.add("stream2");
+ }
+
+ final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null, raw);
+ wrapper.setup(output, taskContext);
+ wrapper.open(null);
+
+ final SplitStreamType splitRecord = new SplitStreamType<Integer>();
+ if (rawOutType1) {
+ splitRecord.streamId = "stream1";
+ splitRecord.value = 2;
+ } else {
+ splitRecord.streamId = "stream1";
+ splitRecord.value = new Tuple1<Integer>(2);
+ }
+ wrapper.processElement(record);
+ verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
+
+ if (rawOutType2) {
+ splitRecord.streamId = "stream2";
+ splitRecord.value = 3;
+ } else {
+ splitRecord.streamId = "stream2";
+ splitRecord.value = new Tuple1<Integer>(3);
+ }
+ wrapper.processElement(record);
+ verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOpen() throws Exception {
+ final StormConfig stormConfig = new StormConfig();
+ final Configuration flinkConfig = new Configuration();
+
+ final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+ when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+ .thenReturn(flinkConfig);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ declarer.declare(new Fields("dummy"));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ final IRichBolt bolt = mock(IRichBolt.class);
+
+ BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+ wrapper.setup(mock(Output.class), taskContext);
+
+ // test without configuration
+ wrapper.open(null);
+ verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+
+ // test with StormConfig
+ wrapper.open(null);
+ verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+ any(OutputCollector.class));
+
+ // test with Configuration
+ final TestDummyBolt testBolt = new TestDummyBolt();
+ wrapper = new BoltWrapper<Object, Object>(testBolt);
+ wrapper.setup(mock(Output.class), taskContext);
+
+ wrapper.open(null);
+ for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
+ Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOpenSink() throws Exception {
+ final StormConfig stormConfig = new StormConfig();
+ final Configuration flinkConfig = new Configuration();
+
+ final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+ when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+ .thenReturn(flinkConfig);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final IRichBolt bolt = mock(IRichBolt.class);
+
+ BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+ wrapper.setup(mock(Output.class), taskContext);
+
+ // test without configuration
+ wrapper.open(null);
+ verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
+ isNull(OutputCollector.class));
+
+ // test with StormConfig
+ wrapper.open(null);
+ verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+ isNull(OutputCollector.class));
+
+ // test with Configuration
+ final TestDummyBolt testBolt = new TestDummyBolt();
+ wrapper = new BoltWrapper<Object, Object>(testBolt);
+ wrapper.setup(mock(Output.class), taskContext);
+
+ wrapper.open(null);
+ for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
+ Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testClose() throws Exception {
+ final IRichBolt bolt = mock(IRichBolt.class);
+
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ declarer.declare(new Fields("dummy"));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+ final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ wrapper.setup(mock(Output.class), taskContext);
+
+ wrapper.close();
+ wrapper.dispose();
+
+ verify(bolt).cleanup();
+ }
+
+ private static final class TestBolt implements IRichBolt {
+ private static final long serialVersionUID = 7278692872260138758L;
+ private OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ int counter = 0;
+ @Override
+ public void execute(backtype.storm.tuple.Tuple input) {
+ if (++counter % 2 == 1) {
+ this.collector.emit("stream1", new Values(input.getInteger(0)));
+ } else {
+ this.collector.emit("stream2", new Values(input.getInteger(0)));
+ }
+ }
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("stream1", new Fields("a1"));
+ declarer.declareStream("stream2", new Fields("a2"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..69d4a8e
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import java.util.HashMap;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.FlinkTopologyContext;
+import org.junit.Test;
+
+
+/*
+ * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here,
+ * because those are tested in StormWrapperSetupHelperTest.
+ */
+public class FlinkTopologyContextTest extends AbstractTest {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAddTaskHook() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .addTaskHook(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetHooks() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .getHooks();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRegisteredMetric1() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .registerMetric(null, (ICombiner) null, 0);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRegisteredMetric2() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .registerMetric(null, (IReducer) null, 0);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRegisteredMetric3() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .registerMetric(null, (IMetric) null, 0);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetRegisteredMetricByName() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .getRegisteredMetricByName(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetAllSubscribedState() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .setAllSubscribedState(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetSubscribedState1() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .setSubscribedState(null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetSubscribedState2() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .setSubscribedState(null, null, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..4618101
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class SetupOutputFieldsDeclarerTest extends AbstractTest {
+
+ @Test
+ public void testDeclare() {
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+
+ int numberOfAttributes = this.r.nextInt(26);
+ declarer.declare(createSchema(numberOfAttributes));
+ Assert.assertEquals(1, declarer.outputSchemas.size());
+ Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+ .intValue());
+
+ final String sid = "streamId";
+ numberOfAttributes = this.r.nextInt(26);
+ declarer.declareStream(sid, createSchema(numberOfAttributes));
+ Assert.assertEquals(2, declarer.outputSchemas.size());
+ Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
+ }
+
+ private Fields createSchema(final int numberOfAttributes) {
+ final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ schema.add("a" + i);
+ }
+ return new Fields(schema);
+ }
+
+ @Test
+ public void testDeclareDirect() {
+ new SetupOutputFieldsDeclarer().declare(false, new Fields());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeclareDirectFail() {
+ new SetupOutputFieldsDeclarer().declare(true, new Fields());
+ }
+
+ @Test
+ public void testDeclareStream() {
+ new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareStreamFail() {
+ new SetupOutputFieldsDeclarer().declareStream(null, new Fields());
+ }
+
+ @Test
+ public void testDeclareFullStream() {
+ new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareFullStreamFailNonDefaultStream() {
+ new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeclareFullStreamFailDirect() {
+ new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
new file mode 100644
index 0000000..6b60d2b
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.SpoutCollector;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class SpoutCollectorTest extends AbstractTest {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
+ for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
+ final SourceContext flinkCollector = mock(SourceContext.class);
+ Tuple flinkTuple = null;
+ final Values tuple = new Values();
+
+ SpoutCollector<?> collector;
+
+ final String streamId = "streamId";
+ HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+ attributes.put(streamId, numberOfAttributes);
+
+ if (numberOfAttributes == -1) {
+ collector = new SpoutCollector(attributes, flinkCollector);
+ tuple.add(new Integer(this.r.nextInt()));
+
+ } else {
+ collector = new SpoutCollector(attributes, flinkCollector);
+ flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ tuple.add(new Integer(this.r.nextInt()));
+ flinkTuple.setField(tuple.get(i), i);
+ }
+ }
+
+ final List<Integer> taskIds;
+ final Object messageId = new Integer(this.r.nextInt());
+
+ taskIds = collector.emit(streamId, tuple, messageId);
+
+ Assert.assertNull(taskIds);
+
+ if (numberOfAttributes == -1) {
+ verify(flinkCollector).collect(tuple.get(0));
+ } else {
+ verify(flinkCollector).collect(flinkTuple);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = UnsupportedOperationException.class)
+ public void testEmitDirect() {
+ new SpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect(
+ 0, null, null,
+ (Object) null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
new file mode 100644
index 0000000..227d736
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.FiniteSpout;
+import org.apache.flink.storm.util.FiniteTestSpout;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WrapperSetupHelper.class)
+public class SpoutWrapperTest extends AbstractTest {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testRunPrepare() throws Exception {
+ final StormConfig stormConfig = new StormConfig();
+ stormConfig.put(this.r.nextInt(), this.r.nextInt());
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.setInteger("testKey", this.r.nextInt());
+
+ final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+ when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+ .thenReturn(flinkConfig);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final IRichSpout spout = mock(IRichSpout.class);
+ SpoutWrapper spoutWrapper = new SpoutWrapper(spout);
+ spoutWrapper.setRuntimeContext(taskContext);
+ spoutWrapper.cancel();
+
+ // test without configuration
+ spoutWrapper.run(mock(SourceContext.class));
+ verify(spout).open(any(Map.class), any(TopologyContext.class),
+ any(SpoutOutputCollector.class));
+
+ // test with StormConfig
+ spoutWrapper.run(mock(SourceContext.class));
+ verify(spout).open(eq(stormConfig), any(TopologyContext.class),
+ any(SpoutOutputCollector.class));
+
+ // test with Configuration
+ final TestDummySpout testSpout = new TestDummySpout();
+ spoutWrapper = new SpoutWrapper(testSpout);
+ spoutWrapper.setRuntimeContext(taskContext);
+ spoutWrapper.cancel();
+
+ spoutWrapper.run(mock(SourceContext.class));
+ for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
+ Assert.assertEquals(entry.getValue(), testSpout.config.get(entry.getKey()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRunExecuteFixedNumber() throws Exception {
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+ declarer.declare(new Fields("dummy"));
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments()
+ .thenReturn(declarer);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final IRichSpout spout = mock(IRichSpout.class);
+ final int numberOfCalls = this.r.nextInt(50);
+ final SpoutWrapper<?> spoutWrapper = new SpoutWrapper<Object>(spout,
+ numberOfCalls);
+ spoutWrapper.setRuntimeContext(taskContext);
+
+ spoutWrapper.run(mock(SourceContext.class));
+ verify(spout, times(numberOfCalls)).nextTuple();
+ }
+
+ @Test
+ public void testRunExecuteFinite() throws Exception {
+ final int numberOfCalls = this.r.nextInt(50);
+
+ final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
+ for (int i = numberOfCalls - 1; i >= 0; --i) {
+ expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+ }
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final FiniteTestSpout spout = new FiniteTestSpout(numberOfCalls);
+ final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(
+ spout, -1);
+ spoutWrapper.setRuntimeContext(taskContext);
+
+ final TestContext collector = new TestContext();
+ spoutWrapper.run(collector);
+
+ Assert.assertEquals(expectedResult, collector.result);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void runAndExecuteFiniteSpout() throws Exception {
+ final FiniteSpout stormSpout = mock(FiniteSpout.class);
+ when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
+ wrapper.setRuntimeContext(taskContext);
+
+ wrapper.run(mock(SourceContext.class));
+ verify(stormSpout, times(3)).nextTuple();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void runAndExecuteFiniteSpout2() throws Exception {
+ final FiniteSpout stormSpout = mock(FiniteSpout.class);
+ when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
+ wrapper.setRuntimeContext(taskContext);
+
+ wrapper.run(mock(SourceContext.class));
+ verify(stormSpout, never()).nextTuple();
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ final int numberOfCalls = 5 + this.r.nextInt(5);
+
+ final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+ when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
+
+ final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+
+ final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(spout);
+ spoutWrapper.setRuntimeContext(taskContext);
+
+ spoutWrapper.cancel();
+ final TestContext collector = new TestContext();
+ spoutWrapper.run(collector);
+
+ Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ final IRichSpout spout = mock(IRichSpout.class);
+ final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(spout);
+
+ spoutWrapper.close();
+
+ verify(spout).close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
new file mode 100644
index 0000000..155fcd9
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.StormTuple;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class StormTupleTest extends AbstractTest {
+ private static final String fieldName = "fieldName";
+ private static final String fieldNamePojo = "member";
+
+ private int arity, index;
+
+ @Override
+ @Before
+ public void prepare() {
+ super.prepare();
+ this.arity = 1 + r.nextInt(25);
+ this.index = r.nextInt(this.arity);
+ }
+
+ @Test
+ public void nonTupleTest() {
+ final Object flinkTuple = this.r.nextInt();
+
+ final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null);
+ Assert.assertSame(flinkTuple, tuple.getValue(0));
+
+ final List<Object> values = tuple.getValues();
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals(flinkTuple, values.get(0));
+ }
+
+ @Test
+ public void tupleTest() throws InstantiationException, IllegalAccessException {
+ final int numberOfAttributes = this.r.nextInt(26);
+ final Object[] data = new Object[numberOfAttributes];
+
+ final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ data[i] = this.r.nextInt();
+ flinkTuple.setField(data[i], i);
+ }
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ final List<Object> values = tuple.getValues();
+
+ Assert.assertEquals(numberOfAttributes, values.size());
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ Assert.assertEquals(flinkTuple.getField(i), values.get(i));
+ }
+
+ Assert.assertEquals(numberOfAttributes, tuple.size());
+ }
+
+ @Test
+ public void testBinary() {
+ final byte[] data = new byte[this.r.nextInt(15)];
+ this.r.nextBytes(data);
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
+ }
+
+ @Test
+ public void testBoolean() {
+ final Boolean flinkTuple = this.r.nextBoolean();
+
+ final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
+ }
+
+ @Test
+ public void testByte() {
+ final Byte flinkTuple = (byte) this.r.nextInt();
+
+ final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getByte(0));
+ }
+
+ @Test
+ public void testDouble() {
+ final Double flinkTuple = this.r.nextDouble();
+
+ final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getDouble(0));
+ }
+
+ @Test
+ public void testFloat() {
+ final Float flinkTuple = this.r.nextFloat();
+
+ final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getFloat(0));
+ }
+
+ @Test
+ public void testInteger() {
+ final Integer flinkTuple = this.r.nextInt();
+
+ final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getInteger(0));
+ }
+
+ @Test
+ public void testLong() {
+ final Long flinkTuple = this.r.nextLong();
+
+ final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getLong(0));
+ }
+
+ @Test
+ public void testShort() {
+ final Short flinkTuple = (short) this.r.nextInt();
+
+ final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getShort(0));
+ }
+
+ @Test
+ public void testString() {
+ final byte[] data = new byte[this.r.nextInt(15)];
+ this.r.nextBytes(data);
+ final String flinkTuple = new String(data);
+
+ final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple, tuple.getString(0));
+ }
+
+ @Test
+ public void testBinaryTuple() {
+ final byte[] data = new byte[this.r.nextInt(15)];
+ this.r.nextBytes(data);
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
+ }
+
+ @Test
+ public void testBooleanTuple() {
+ final Boolean data = this.r.nextBoolean();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
+ }
+
+ @Test
+ public void testByteTuple() {
+ final Byte data = (byte) this.r.nextInt();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
+ }
+
+ @Test
+ public void testDoubleTuple() {
+ final Double data = this.r.nextDouble();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
+ }
+
+ @Test
+ public void testFloatTuple() {
+ final Float data = this.r.nextFloat();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
+ }
+
+ @Test
+ public void testIntegerTuple() {
+ final Integer data = this.r.nextInt();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
+ }
+
+ @Test
+ public void testLongTuple() {
+ final Long data = this.r.nextLong();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
+ }
+
+ @Test
+ public void testShortTuple() {
+ final Short data = (short) this.r.nextInt();
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
+ }
+
+ @Test
+ public void testStringTuple() {
+ final byte[] rawdata = new byte[this.r.nextInt(15)];
+ this.r.nextBytes(rawdata);
+ final String data = new String(rawdata);
+
+ final int index = this.r.nextInt(5);
+ final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+ flinkTuple.setField(data, index);
+
+ final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+ Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
+ }
+
+ @Test
+ public void testContains() throws Exception {
+ Fields schema = new Fields("a1", "a2");
+ StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+ schema);
+
+ Assert.assertTrue(tuple.contains("a1"));
+ Assert.assertTrue(tuple.contains("a2"));
+ Assert.assertFalse(tuple.contains("a3"));
+ }
+
+ @Test
+ public void testGetFields() throws Exception {
+ Fields schema = new Fields();
+
+ Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+ schema).getFields());
+
+ Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields());
+
+ }
+
+ @Test
+ public void testFieldIndex() throws Exception {
+ Fields schema = new Fields("a1", "a2");
+ StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+ schema);
+
+ Assert.assertEquals(0, tuple.fieldIndex("a1"));
+ Assert.assertEquals(1, tuple.fieldIndex("a2"));
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testSelect() throws Exception {
+ Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+ Values values = new Values();
+
+ ArrayList<String> attributeNames = new ArrayList<String>(arity);
+ ArrayList<String> filterNames = new ArrayList<String>(arity);
+
+ for (int i = 0; i < arity; ++i) {
+ tuple.setField(i, i);
+ attributeNames.add("a" + i);
+
+ if (r.nextBoolean()) {
+ filterNames.add("a" + i);
+ values.add(i);
+ }
+ }
+ Fields schema = new Fields(attributeNames);
+ Fields selector = new Fields(filterNames);
+
+ Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetValueByField() throws Exception {
+ Object value = mock(Object.class);
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getValueByField(fieldName));
+
+ }
+
+ @Test
+ public void testGetValueByFieldPojo() throws Exception {
+ Object value = mock(Object.class);
+ TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
+ StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetValueByFieldPojoGetter() throws Exception {
+ Object value = mock(Object.class);
+ TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
+ StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetStringByField() throws Exception {
+ String value = "stringValue";
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getStringByField(fieldName));
+ }
+
+ @Test
+ public void testGetStringByFieldPojo() throws Exception {
+ String value = "stringValue";
+ TestPojoMember<String> pojo = new TestPojoMember<String>(value);
+ StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetStringByFieldPojoGetter() throws Exception {
+ String value = "stringValue";
+ TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
+ StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetIntegerByField() throws Exception {
+ Integer value = r.nextInt();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getIntegerByField(fieldName));
+ }
+
+ @Test
+ public void testGetIntegerByFieldPojo() throws Exception {
+ Integer value = r.nextInt();
+ TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value);
+ StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetIntegerByFieldPojoGetter() throws Exception {
+ Integer value = r.nextInt();
+ TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value);
+ StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetLongByField() throws Exception {
+ Long value = r.nextLong();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getLongByField(fieldName));
+ }
+
+ @Test
+ public void testGetLongByFieldPojo() throws Exception {
+ Long value = r.nextLong();
+ TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
+ StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetLongByFieldPojoGetter() throws Exception {
+ Long value = r.nextLong();
+ TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
+ StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetBooleanByField() throws Exception {
+ Boolean value = r.nextBoolean();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
+ }
+
+ @Test
+ public void testGetBooleanByFieldPojo() throws Exception {
+ Boolean value = r.nextBoolean();
+ TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value);
+ StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetBooleanByFieldPojoGetter() throws Exception {
+ Boolean value = r.nextBoolean();
+ TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value);
+ StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetShortByField() throws Exception {
+ Short value = (short) r.nextInt();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getShortByField(fieldName));
+ }
+
+ @Test
+ public void testGetShortByFieldPojo() throws Exception {
+ Short value = (short) r.nextInt();
+ TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
+ StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetShortByFieldPojoGetter() throws Exception {
+ Short value = (short) r.nextInt();
+ TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
+ StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetByteByField() throws Exception {
+ Byte value = new Byte((byte) r.nextInt());
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getByteByField(fieldName));
+ }
+
+ @Test
+ public void testGetByteByFieldPojo() throws Exception {
+ Byte value = new Byte((byte) r.nextInt());
+ TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
+ StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetByteByFieldPojoGetter() throws Exception {
+ Byte value = new Byte((byte) r.nextInt());
+ TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
+ StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetDoubleByField() throws Exception {
+ Double value = r.nextDouble();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getDoubleByField(fieldName));
+ }
+
+ @Test
+ public void testGetDoubleByFieldPojo() throws Exception {
+ Double value = r.nextDouble();
+ TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
+ StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetDoubleByFieldPojoGetter() throws Exception {
+ Double value = r.nextDouble();
+ TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
+ StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetFloatByField() throws Exception {
+ Float value = r.nextFloat();
+ StormTuple tuple = testGetByField(arity, index, value);
+ Assert.assertSame(value, tuple.getFloatByField(fieldName));
+ }
+
+ @Test
+ public void testGetFloatByFieldPojo() throws Exception {
+ Float value = r.nextFloat();
+ TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
+ StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetFloatByFieldPojoGetter() throws Exception {
+ Float value = r.nextFloat();
+ TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
+ StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo,
+ null);
+ Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGetBinaryByField() throws Exception {
+ byte[] data = new byte[1 + r.nextInt(20)];
+ r.nextBytes(data);
+ StormTuple tuple = testGetByField(arity, index, data);
+ Assert.assertSame(data, tuple.getBinaryByField(fieldName));
+ }
+
+ @Test
+ public void testGetBinaryFieldPojo() throws Exception {
+ byte[] data = new byte[1 + r.nextInt(20)];
+ r.nextBytes(data);
+ TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
+ StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo,
+ null);
+ Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+ }
+
+ @Test
+ public void testGetBinaryByFieldPojoGetter() throws Exception {
+ byte[] data = new byte[1 + r.nextInt(20)];
+ r.nextBytes(data);
+ TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
+ StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo,
+ null);
+ Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private <T> StormTuple testGetByField(int arity, int index, T value)
+ throws Exception {
+
+ assert (index < arity);
+
+ Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+ tuple.setField(value, index);
+
+ ArrayList<String> attributeNames = new ArrayList<String>(arity);
+ for(int i = 0; i < arity; ++i) {
+ if(i == index) {
+ attributeNames.add(fieldName);
+ } else {
+ attributeNames.add("" + i);
+ }
+ }
+ Fields schema = new Fields(attributeNames);
+
+ return new StormTuple(tuple, schema);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetSourceGlobalStreamid() {
+ new StormTuple<Object>(null, null).getSourceGlobalStreamid();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetSourceComponent() {
+ new StormTuple<Object>(null, null).getSourceComponent();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetSourceTask() {
+ new StormTuple<Object>(null, null).getSourceTask();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetSourceStreamId() {
+ new StormTuple<Object>(null, null).getSourceStreamId();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetMessageId() {
+ new StormTuple<Object>(null, null).getMessageId();
+ }
+
+ public static class TestPojoMember<T> {
+ public T member;
+
+ public TestPojoMember(T value) {
+ this.member = value;
+ }
+ }
+
+ public static class TestPojoGetter<T> {
+ private T member;
+
+ public TestPojoGetter(T value) {
+ this.member = value;
+ }
+
+ public T getMember() {
+ return this.member;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
new file mode 100644
index 0000000..4c4749a
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.LinkedList;
+
+class TestContext implements SourceContext<Tuple1<Integer>> {
+ public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
+
+ public TestContext() {
+ }
+
+ @Override
+ public void collect(final Tuple1<Integer> record) {
+ this.result.add(record.copy());
+ }
+
+ @Override
+ public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {
+ this.result.add(element.copy());
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ // ignore it
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}