You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/16 20:13:45 UTC

[3/9] flink git commit: [FLINK-4450] [storm compat] Update storm version to 1.0

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
index 99c2583..10f9797 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.topology.IRichSpout;
+import org.apache.storm.topology.IRichSpout;
 
 /**
  * This interface represents a spout that emits a finite number of records. Common spouts emit infinite streams by

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
index 23d9d70..20e3309 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
@@ -19,10 +19,10 @@ package org.apache.flink.storm.util;
 
 import java.util.Map;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 
 /**
  * {@link NullTerminatingSpout} in a finite spout (ie, implements {@link FiniteSpout} interface) that wraps an

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
index b79cc4e..9e222ec 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
@@ -19,8 +19,8 @@ package org.apache.flink.storm.util;
 
 import java.util.List;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.utils.Utils;
 
 /**
  * Observes if a call to any {@code emit(...)} or {@code emitDirect(...)} method is made.

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
index 38ce58c..040c395 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
@@ -1,122 +1,122 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.storm.util;
-
-import backtype.storm.Config;
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
- * object) for embedded Spouts and Bolts.
- */
-@SuppressWarnings("rawtypes")
-public final class StormConfig extends GlobalJobParameters implements Map {
-	private static final long serialVersionUID = 8019519109673698490L;
-
-	/** Contains the actual configuration that is provided to Spouts and Bolts. */
-	private final Map config = new HashMap();
-
-	/**
-	 * Creates an empty configuration.
-	 */
-	public StormConfig() {
-	}
-
-	/**
-	 * Creates an configuration with initial values provided by the given {@code Map}.
-	 * 
-	 * @param config
-	 *            Initial values for this configuration.
-	 */
-	@SuppressWarnings("unchecked")
-	public StormConfig(Map config) {
-		this.config.putAll(config);
-	}
-
-
-	@Override
-	public int size() {
-		return this.config.size();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return this.config.isEmpty();
-	}
-
-	@Override
-	public boolean containsKey(Object key) {
-		return this.config.containsKey(key);
-	}
-
-	@Override
-	public boolean containsValue(Object value) {
-		return this.config.containsValue(value);
-	}
-
-	@Override
-	public Object get(Object key) {
-		return this.config.get(key);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Object put(Object key, Object value) {
-		return this.config.put(key, value);
-	}
-
-	@Override
-	public Object remove(Object key) {
-		return this.config.remove(key);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void putAll(Map m) {
-		this.config.putAll(m);
-	}
-
-	@Override
-	public void clear() {
-		this.config.clear();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Set<Object> keySet() {
-		return this.config.keySet();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Collection<Object> values() {
-		return this.config.values();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
-		return this.config.entrySet();
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.storm.Config;
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
+ * object) for embedded Spouts and Bolts.
+ */
+@SuppressWarnings("rawtypes")
+public final class StormConfig extends GlobalJobParameters implements Map {
+	private static final long serialVersionUID = 8019519109673698490L;
+
+	/** Contains the actual configuration that is provided to Spouts and Bolts. */
+	private final Map config = new HashMap();
+
+	/**
+	 * Creates an empty configuration.
+	 */
+	public StormConfig() {
+	}
+
+	/**
+	 * Creates an configuration with initial values provided by the given {@code Map}.
+	 * 
+	 * @param config
+	 *            Initial values for this configuration.
+	 */
+	@SuppressWarnings("unchecked")
+	public StormConfig(Map config) {
+		this.config.putAll(config);
+	}
+
+
+	@Override
+	public int size() {
+		return this.config.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.config.isEmpty();
+	}
+
+	@Override
+	public boolean containsKey(Object key) {
+		return this.config.containsKey(key);
+	}
+
+	@Override
+	public boolean containsValue(Object value) {
+		return this.config.containsValue(value);
+	}
+
+	@Override
+	public Object get(Object key) {
+		return this.config.get(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Object put(Object key, Object value) {
+		return this.config.put(key, value);
+	}
+
+	@Override
+	public Object remove(Object key) {
+		return this.config.remove(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void putAll(Map m) {
+		this.config.putAll(m);
+	}
+
+	@Override
+	public void clear() {
+		this.config.clear();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<Object> keySet() {
+		return this.config.keySet();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Collection<Object> values() {
+		return this.config.values();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+		return this.config.entrySet();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
index 2196a1c..7b94707 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
@@ -88,4 +88,8 @@ class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputC
 	@Override
 	public void fail(final Tuple input) {}
 
+	@Override
+	public void resetTimeout(Tuple var1) {}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 55a8e28..731f28f 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -17,15 +17,15 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.utils.Utils;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index 52d39a7..f55f0e3 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.ICombiner;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IReducer;
+import org.apache.storm.metric.api.ReducedMetric;
+import org.apache.storm.state.ISubscribedState;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
 import clojure.lang.Atom;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
index 7a3b6d5..6dd6973 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichBolt;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
index daf9252..d927f0e 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 
 import java.util.HashMap;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
index 0e2190e..5404027 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.spout.ISpoutOutputCollector;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -79,4 +79,9 @@ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutO
 		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 	}
 
+	public long getPendingCount() {
+		return 0;
+	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index c171ccc..3dd1e10 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.common.functions.StoppableFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
index febf0f3..30085fc 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
@@ -19,16 +19,16 @@ package org.apache.flink.storm.wrappers;
 
 /*
  * We do neither import
- * 		backtype.storm.tuple.Tuple;
+ * 		org.apache.storm.tuple.Tuple;
  * nor
  * 		org.apache.flink.api.java.tuple.Tuple
  * to avoid confusion
  */
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Values;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -37,7 +37,7 @@ import java.util.List;
 /**
  * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
  */
-public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
+public class StormTuple<IN> implements org.apache.storm.tuple.Tuple {
 
 	/** The Storm representation of the original Flink tuple. */
 	private final Values stormTuple;
@@ -55,7 +55,7 @@ public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 
 	/**
 	 * Create a new Storm tuple from the given Flink tuple.
-	 * 
+	 *
 	 * @param flinkTuple
 	 *            The Flink tuple to be converted.
 	 * @param schema
@@ -389,4 +389,10 @@ public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 		return "StormTuple{ " + stormTuple.toString() + "[" + this.producerComponentId + ","
 				+ this.producerStreamId + "," + this.producerTaskId + "," + this.messageId + "]}";
 	}
+
+	@Override
+	public GlobalStreamId getSourceGlobalStreamId() {
+		return new GlobalStreamId(this.producerComponentId, this.producerStreamId);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
index 74a12dd..3a9b650 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -16,18 +16,18 @@
  */
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.tuple.Fields;
 import clojure.lang.Atom;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
index 90a82ba..ddbeaff 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
index 39b01d8..0ec0179 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
index 6077534..0f617fb 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
@@ -16,11 +16,11 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
index 846ae51..1b185a7 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
@@ -16,10 +16,10 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
index 1b320e5..9a5b1cd 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
@@ -17,12 +17,12 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
index da2021c..1eaed4a 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.storm.util;
 import java.util.HashMap;
 import java.util.Map;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
index 0e3784a..a5b96bd 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.storm.spout.SpoutOutputCollector;
 
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
index 0fc7df9..2ad8f2e 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
index 7fe8df7..82506e4 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
index c11597c..1f4da55 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
@@ -16,11 +16,11 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.LinkedList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
index e8748d0..9e3165b 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.tuple.Values;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.storm.util.AbstractTest;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 1440b51..1f8f773 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -338,7 +338,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 		int counter = 0;
 		@Override
-		public void execute(backtype.storm.tuple.Tuple input) {
+		public void execute(org.apache.storm.tuple.Tuple input) {
 			if (++counter % 2 == 1) {
 				this.collector.emit("stream1", new Values(input.getInteger(0)));
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
index de9be2a..9a23b0f 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.api.ICombiner;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IReducer;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
index 481cb5c..94a88fe 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
index fac2582..eb91c63 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.tuple.Values;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.storm.util.AbstractTest;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index dc84b33..265e705 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
index eba611e..7ea4b76 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple5;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index b4e153a..5e29ac4 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -17,17 +17,17 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.util.AbstractTest;
@@ -186,15 +186,15 @@ public class WrapperSetupHelperTest extends AbstractTest {
 				.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
 				.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
 
-		LocalCluster cluster = new LocalCluster();
-		Config c = new Config();
-		c.setNumAckers(0);
-		cluster.submitTopology("test", c, builder.createTopology());
-
-		while (TestSink.result.size() != 8) {
-			Utils.sleep(100);
-		}
-		cluster.shutdown();
+//		LocalCluster cluster = new LocalCluster();
+//		Config c = new Config();
+//		c.setNumAckers(0);
+//		cluster.submitTopology("test", c, builder.createTopology());
+//
+//		while (TestSink.result.size() != 8) {
+//			Utils.sleep(100);
+//		}
+//		cluster.shutdown();
 
 		final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
 		StormTopology stormTopology = flinkBuilder.getStormTopology();