You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:04 UTC
[14/27] flink git commit: [storm-compat] adapted layer to new
streaming API
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 48f680a..6d2f196 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -45,7 +45,7 @@ public class StormSpoutWrapperTest extends AbstractTest {
spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
spoutWrapper.cancel();
- final TestCollector collector = new TestCollector();
+ final TestContext collector = new TestContext();
spoutWrapper.run(collector);
Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java
deleted file mode 100644
index 2d85597..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-import java.util.LinkedList;
-
-class TestCollector implements Collector<Tuple1<Integer>> {
- public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
-
- public TestCollector() {
- }
-
- @Override
- public void collect(final Tuple1<Integer> record) {
- this.result.add(record.copy());
- }
-
- @Override
- public void close() {/* nothing to to */}
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
new file mode 100644
index 0000000..8885a1b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import java.util.LinkedList;
+
+class TestContext implements SourceContext<Tuple1<Integer>> {
+ public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
+
+ public TestContext() {
+ }
+
+ @Override
+ public void collect(final Tuple1<Integer> record) {
+ this.result.add(record.copy());
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
index cae50b7..79c7125 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.stormcompatibility.singlejoin;
import backtype.storm.tuple.Fields;
@@ -27,10 +44,10 @@ public class SingleJoinTopology {
builder.setSpout(spoutId2, new AgeSpout(new Fields("id", "age")));
builder.setBolt(boltId, new SingleJoinBolt(new Fields("gender", "age")))
- .fieldsGrouping(spoutId1, new Fields("id"))
- .fieldsGrouping(spoutId2, new Fields("id"));
- //.shuffleGrouping(spoutId1)
- //.shuffleGrouping(spoutId2);
+ .fieldsGrouping(spoutId1, new Fields("id"))
+ .fieldsGrouping(spoutId2, new Fields("id"));
+ //.shuffleGrouping(spoutId1)
+ //.shuffleGrouping(spoutId2);
// emit result
if (fileInputOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
index 8d66d29..d70914a 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
@@ -1,9 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.stormcompatibility.singlejoin;
import backtype.storm.utils.Utils;
import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.wordcount.WordCountTopology;
public class StormSingleJoinLocal {
public final static String topologyId = "Streaming SingleJoin";
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
index 4edc133..49761c3 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
import backtype.storm.topology.OutputFieldsDeclarer;
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
index ac25917..d507998 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
import backtype.storm.topology.OutputFieldsDeclarer;
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
index 3df1618..cd53140 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
@@ -1,6 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-import backtype.storm.Config;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
index 939380c..b121744 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
@@ -34,9 +34,9 @@ public abstract class AbstractStormBoltSink implements IRichBolt {
private StringBuilder lineBuilder;
private String prefix = "";
- private OutputFormatter formatter;
+ private final OutputFormatter formatter;
- public AbstractStormBoltSink(OutputFormatter formatter) {
+ public AbstractStormBoltSink(final OutputFormatter formatter) {
this.formatter = formatter;
}
@@ -56,14 +56,7 @@ public abstract class AbstractStormBoltSink implements IRichBolt {
public final void execute(final Tuple input) {
this.lineBuilder = new StringBuilder();
this.lineBuilder.append(this.prefix);
- lineBuilder.append(formatter.format(input));
-// this.lineBuilder.append("(");
-// for (final Object attribute : input.getValues()) {
-// this.lineBuilder.append(attribute);
-// this.lineBuilder.append(",");
-// }
-// this.lineBuilder.replace(this.lineBuilder.length() - 1, this.lineBuilder.length(), ")");
-
+ this.lineBuilder.append(this.formatter.format(input));
this.writeExternal(this.lineBuilder.toString());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
index 0de7b5c..bfc3135 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
@@ -18,8 +18,12 @@
package org.apache.flink.stormcompatibility.util;
-public interface OutputFormatter {
+import java.io.Serializable;
- public String format(Object input);
+import backtype.storm.tuple.Tuple;
+
+public interface OutputFormatter extends Serializable {
+
+ public String format(Tuple input);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
index fca5fcc..a9d72d9 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -18,10 +18,14 @@
package org.apache.flink.stormcompatibility.util;
+import backtype.storm.tuple.Tuple;
+
public class SimpleOutputFormatter implements OutputFormatter {
+ private static final long serialVersionUID = 6349573860144270338L;
@Override
- public String format(Object input) {
- return input.toString();
+ public String format(final Tuple input) {
+ return input.getValue(0).toString();
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
index 86b55aa..a92bc6a 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
@@ -34,7 +34,7 @@ public final class StormBoltFileSink extends AbstractStormBoltSink {
private final String path;
private BufferedWriter writer;
- public StormBoltFileSink(final String path, OutputFormatter formatter) {
+ public StormBoltFileSink(final String path, final OutputFormatter formatter) {
super(formatter);
this.path = path;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
index f22fab6..6419ee3 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
@@ -20,16 +20,14 @@ package org.apache.flink.stormcompatibility.util;
import backtype.storm.tuple.Tuple;
-import java.io.Serializable;
-
-public class TupleOutputFormatter implements OutputFormatter, Serializable {
+public class TupleOutputFormatter implements OutputFormatter {
+ private static final long serialVersionUID = -599665757723851761L;
@Override
- public String format(Object input) {
- Tuple inputTuple = (Tuple) input;
- StringBuilder stringBuilder = new StringBuilder();
+ public String format(final Tuple input) {
+ final StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("(");
- for (final Object attribute : inputTuple.getValues()) {
+ for (final Object attribute : input.getValues()) {
stringBuilder.append(attribute);
stringBuilder.append(",");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
index a6b64d2..7b4f471 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -65,11 +65,11 @@ public class StormWordCountLocal {
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());
- Utils.sleep(100 * 1000);
+ Utils.sleep(5 * 1000);
// TODO kill does no do anything so far
- //cluster.killTopology(topologyId);
- //cluster.shutdown();
+ cluster.killTopology(topologyId);
+ cluster.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
index e384f34..75dd5fc 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
@@ -36,7 +36,7 @@ public class StormBoltExclamationITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
index e3813c4..d6bcf30 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -39,7 +39,7 @@ public class StormExclamationLocalITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
index 06c5d9a..2b08b4b 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
@@ -36,7 +36,7 @@ public class StormSpoutExclamationITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
index 2a5a0b3..9228474 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
@@ -34,7 +34,7 @@ public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- this.compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
index d08b560..9d7b869 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
@@ -34,7 +34,7 @@ public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- this.compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
index eeb85ef..2427818 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
@@ -37,7 +37,7 @@ public class StormWordCountLocalITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- this.compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4553c0b..5e7be8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -956,20 +956,58 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return the data stream constructed
*/
- @SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
+ return addSource(function, sourceName, null);
+ }
- TypeInformation<OUT> typeInfo;
+ /**
+ * Ads a data source with a custom type information thus opening a
+ * {@link DataStream}. Only in very special cases does the user need to
+ * support type information. Otherwise use
+ * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param <OUT>
+ * type of the returned stream
+ * @param typeInfo
+ * the user defined type information for the stream
+ * @return the data stream constructed
+ */
+ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo) {
+ return addSource(function, "Custom Source", typeInfo);
+ }
- if (function instanceof ResultTypeQueryable) {
- typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
- } else {
- try {
- typeInfo = TypeExtractor.createTypeInfo(
- SourceFunction.class,
- function.getClass(), 0, null, null);
- } catch (InvalidTypesException e) {
- typeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
+ /**
+ * Ads a data source with a custom type information thus opening a
+ * {@link DataStream}. Only in very special cases does the user need to
+ * support type information. Otherwise use
+ * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param sourceName
+ * Name of the data source
+ * @param <OUT>
+ * type of the returned stream
+ * @param typeInfo
+ * the user defined type information for the stream
+ * @return the data stream constructed
+ */
+ @SuppressWarnings("unchecked")
+ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
+
+ if(typeInfo == null) {
+ if (function instanceof ResultTypeQueryable) {
+ typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
+ } else {
+ try {
+ typeInfo = TypeExtractor.createTypeInfo(
+ SourceFunction.class,
+ function.getClass(), 0, null, null);
+ } catch (final InvalidTypesException e) {
+ typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
+ }
}
}