You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/16 20:13:43 UTC

[1/9] flink git commit: [FLINK-5438] [streaming api] Typo in JobGraph generator Exception

Repository: flink
Updated Branches:
  refs/heads/master ef8cdfe59 -> 7a339a65f


[FLINK-5438] [streaming api] Typo in JobGraph generator Exception

This closes #3098


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2f28c01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2f28c01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2f28c01

Branch: refs/heads/master
Commit: c2f28c013116328583043ca1433c45c85e32de30
Parents: fa67ef4
Author: mtunique <oa...@gmail.com>
Authored: Thu Jan 12 12:02:49 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:47 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamingJobGraphGenerator.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2f28c01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0cb7d9a..1bfaf3f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -317,7 +317,7 @@ public class StreamingJobGraphGenerator {
 			// the parallelism should always be smaller or equal than the max parallelism
 			throw new IllegalStateException("The maximum parallelism (" + maxParallelism + ") of " +
 				"the stream node " + streamNode + " is smaller than the parallelism (" +
-				parallelism + "). Increase the maximum parallelism or decrease the parallelism of" +
+				parallelism + "). Increase the maximum parallelism or decrease the parallelism of " +
 				"this operator.");
 		} else {
 			jobVertex.setMaxParallelism(streamNode.getMaxParallelism());


[2/9] flink git commit: [FLINK-5448] [checkpoints] Fix typo in StateAssignmentOperation Exception

Posted by se...@apache.org.
[FLINK-5448] [checkpoints] Fix typo in StateAssignmentOperation Exception

This closes #3097


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa67ef40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa67ef40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa67ef40

Branch: refs/heads/master
Commit: fa67ef409c9d0d152d22c74e3ace4d56bc8aa7da
Parents: 475c0b1
Author: mtunique <oa...@gmail.com>
Authored: Thu Jan 12 11:55:57 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:47 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/StateAssignmentOperation.java       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa67ef40/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index f11f69b..6c23f02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -109,8 +109,8 @@ public class StateAssignmentOperation {
 			if (hasNonPartitionedState && parallelismChanged) {
 				throw new IllegalStateException("Cannot restore the latest checkpoint because " +
 						"the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
-						"state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-						" has parallelism " + newParallelism + " whereas the corresponding" +
+						"state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() +
+						" has parallelism " + newParallelism + " whereas the corresponding " +
 						"state object has a parallelism of " + oldParallelism);
 			}
 


[5/9] flink git commit: [FLINK-3617] [scala apis] Added null value check.

Posted by se...@apache.org.
[FLINK-3617] [scala apis] Added null value check.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdce1f31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdce1f31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdce1f31

Branch: refs/heads/master
Commit: fdce1f319c512fc845b64cbb7cbfb10f9d899021
Parents: c4626cb
Author: Aleksandr Chermenin <al...@epam.com>
Authored: Fri Dec 16 14:42:50 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:48 2017 +0100

----------------------------------------------------------------------
 .../flink/api/scala/typeutils/CaseClassSerializer.scala  | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdce1f31/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 625ee80..29b4952 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.scala.typeutils
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.types.NullFieldException
 
 /**
  * Serializer for Case Classes. Creation and access is different from
@@ -97,7 +98,13 @@ abstract class CaseClassSerializer[T <: Product](
     var i = 0
     while (i < arity) {
       val serializer = fieldSerializers(i).asInstanceOf[TypeSerializer[Any]]
-      serializer.serialize(value.productElement(i), target)
+      val o = value.productElement(i)
+      try
+        serializer.serialize(o, target)
+      catch {
+        case e: NullPointerException =>
+          throw new NullFieldException(i, e)
+      }
       i += 1
     }
   }


[6/9] flink git commit: [FLINK-5345] [core] Migrate various cleanup calls to concurrency-safe directory deletion

Posted by se...@apache.org.
[FLINK-5345] [core] Migrate various cleanup calls to concurrency-safe directory deletion


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4626cba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4626cba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4626cba

Branch: refs/heads/master
Commit: c4626cbae074ba288e54308c40f93258e14c9667
Parents: 8742ff1
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 12 10:49:13 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:48 2017 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java      | 4 +++-
 .../org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java | 6 +++++-
 .../src/main/java/org/apache/flink/runtime/blob/BlobCache.java | 3 ++-
 .../main/java/org/apache/flink/runtime/blob/BlobServer.java    | 3 ++-
 .../org/apache/flink/runtime/io/disk/iomanager/IOManager.java  | 2 +-
 5 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4626cba/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 71e2c79..b207af6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -54,8 +53,10 @@ import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -65,6 +66,7 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Snapshot;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4626cba/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3080b57..92c2e36 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -31,7 +32,7 @@ import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
-import org.apache.commons.io.FileUtils;
+
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -81,8 +82,11 @@ import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.util.FileUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.ExecutionContextExecutor;
 import scala.concurrent.Promise;

http://git-wip-us.apache.org/repos/asf/flink/blob/c4626cba/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 3f93652..7ef1f04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4626cba/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 33f9db7..d4190a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -29,7 +28,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4626cba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 7904cc4..6c84d7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;


[8/9] flink git commit: [FLINK-5345] [core] Add a utility to delete directories without failing in the presence of concurrent deletes

Posted by se...@apache.org.
[FLINK-5345] [core] Add a utility to delete directories without failing in the presence of concurrent deletes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8742ff1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8742ff1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8742ff1b

Branch: refs/heads/master
Commit: 8742ff1ba4c345e9aa8fd0adc207930cdef959a6
Parents: faee74e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 11 21:05:57 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:48 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/FileUtils.java   | 171 +++++++++++++++++--
 .../org/apache/flink/util/FileUtilsTest.java    | 162 ++++++++++++++++++
 2 files changed, 319 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8742ff1b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 23f5eb9..0d527d5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -23,27 +23,28 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.StandardOpenOption;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * This is a utility class to deal with temporary files.
+ * This is a utility class to deal files and directories. Contains utilities for recursive
+ * deletion and creation of temporary files.
  */
 public final class FileUtils {
 
-	/**
-	 * The alphabet to construct the random part of the filename from.
-	 */
-	private static final char[] ALPHABET = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd',
-		'e', 'f' };
+	/** The alphabet to construct the random part of the filename from. */
+	private static final char[] ALPHABET = 
+			{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f' };
 
-	/**
-	 * The length of the random part of the filename.
-	 */
-	private static final int LENGTH = 12;
+	/** The length of the random part of the filename. */
+	private static final int RANDOM_FILE_NAME_LENGTH = 12;
 
-	
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Constructs a random filename with the given prefix and
@@ -54,10 +55,9 @@ public final class FileUtils {
 	 * @return the generated random filename with the given prefix
 	 */
 	public static String getRandomFilename(final String prefix) {
-
 		final StringBuilder stringBuilder = new StringBuilder(prefix);
 
-		for (int i = 0; i < LENGTH; i++) {
+		for (int i = 0; i < RANDOM_FILE_NAME_LENGTH; i++) {
 			stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * (double) ALPHABET.length)]);
 		}
 
@@ -87,7 +87,150 @@ public final class FileUtils {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Deleting directories
+	//  Deleting directories on standard File Systems
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Removes the given file or directory recursively.
+	 * 
+	 * <p>If the file or directory does not exist, this does not throw an exception, but simply does nothing.
+	 * It considers the fact that a file-to-be-deleted is not present a success.
+	 * 
+	 * <p>This method is safe against other concurrent deletion attempts.
+	 * 
+	 * @param file The file or directory to delete.
+	 * 
+	 * @throws IOException Thrown if the directory could not be cleaned for some reason, for example
+	 *                     due to missing access/write permissions.
+	 */
+	public static void deleteFileOrDirectory(File file) throws IOException {
+		checkNotNull(file, "file");
+
+		if (file.isDirectory()) {
+			// file exists and is directory
+			deleteDirectory(file);
+		}
+		else if (file.exists()) {
+			try {
+				Files.delete(file.toPath());
+			}
+			catch (NoSuchFileException e) {
+				// if the file is already gone (concurrently), we don't mind
+			}
+		}
+		// else: already deleted
+	}
+
+	/**
+	 * Deletes the given directory recursively.
+	 * 
+	 * <p>If the directory does not exist, this does not throw an exception, but simply does nothing.
+	 * It considers the fact that a directory-to-be-deleted is not present a success.
+	 * 
+	 * <p>This method is safe against other concurrent deletion attempts.
+	 * 
+	 * @param directory The directory to be deleted.
+	 * @throws IOException Thrown if the given file is not a directory, or if the directory could not be
+	 *                     deleted for some reason, for example due to missing access/write permissions.
+	 */
+	public static void deleteDirectory(File directory) throws IOException {
+		checkNotNull(directory, "directory");
+
+		if (directory.isDirectory()) {
+			// directory exists and is a directory
+
+			// empty the directory first
+			try {
+				cleanDirectory(directory);
+			}
+			catch (FileNotFoundException ignored) {
+				// someone concurrently deleted the directory, nothing to do for us
+				return;
+			}
+
+			// delete the directory. this fails if the directory is not empty, meaning
+			// if new files got concurrently created. we want to fail then.
+			try {
+				Files.delete(directory.toPath());
+			}
+			catch (NoSuchFileException ignored) {
+				// if someone else deleted this concurrently, we don't mind
+				// the result is the same for us, after all
+			}
+		}
+		else if (directory.exists()) {
+			// exists but is file, not directory
+			// either an error from the caller, or concurrently a file got created
+			throw new IOException(directory + " is not a directory");
+		}
+		// else: does not exist, which is okay (as if deleted)
+	}
+
+	/**
+	 * Deletes the given directory recursively, not reporting any I/O exceptions
+	 * that occur.
+	 * 
+	 * <p>This method is identical to {@link FileUtils#deleteDirectory(File)}, except that it
+	 * swallows all exceptions and may leave the job quietly incomplete.
+	 * 
+	 * @param directory The directory to delete.
+	 */
+	public static void deleteDirectoryQuietly(File directory) {
+		if (directory == null) {
+			return;
+		}
+
+		// delete and do not report if it fails
+		try {
+			deleteDirectory(directory);
+		} catch (Exception ignored) {}
+	}
+
+	/**
+	 * Removes all files contained within a directory, without removing the directory itself. 
+	 * 
+	 * <p>This method is safe against other concurrent deletion attempts.
+	 * 
+	 * @param directory The directory to remove all files from.
+	 * 
+	 * @throws FileNotFoundException Thrown if the directory itself does not exist.
+	 * @throws IOException Thrown if the file indicates a proper file and not a directory, or if
+	 *                     the directory could not be cleaned for some reason, for example
+	 *                     due to missing access/write permissions.
+	 */
+	public static void cleanDirectory(File directory) throws IOException, FileNotFoundException {
+		checkNotNull(directory, "directory");
+
+		if (directory.isDirectory()) {
+			final File[] files = directory.listFiles();
+
+			if (files == null) {
+				// directory does not exist any more or no permissions
+				if (directory.exists()) {
+					throw new IOException("Failed to list contents of " + directory);
+				} else {
+					throw new FileNotFoundException(directory.toString());
+				}
+			}
+
+			// remove all files in the directory
+			for (File file : files) {
+				if (file != null) {
+					deleteFileOrDirectory(file);
+				}
+			}
+		}
+		else if (directory.exists()) {
+			throw new IOException(directory + " is not a directory but a regular file");
+		}
+		else {
+			// else does not exist at all
+			throw new FileNotFoundException(directory.toString());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Deleting directories on Flink FileSystem abstraction
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8742ff1b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
new file mode 100644
index 0000000..166d24d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.Files;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+public class FileUtilsTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testDeleteQuietly() throws Exception {
+		// should ignore the call
+		FileUtils.deleteDirectoryQuietly(null);
+
+		File doesNotExist = new File(tmp.getRoot(), "abc");
+		FileUtils.deleteDirectoryQuietly(doesNotExist);
+
+		File cannotDeleteParent = tmp.newFolder();
+		File cannotDeleteChild = new File(cannotDeleteParent, "child");
+
+		try {
+			assumeTrue(cannotDeleteChild.createNewFile());
+			assumeTrue(cannotDeleteParent.setWritable(false));
+			assumeTrue(cannotDeleteChild.setWritable(false));
+
+			FileUtils.deleteDirectoryQuietly(cannotDeleteParent);
+		}
+		finally {
+			//noinspection ResultOfMethodCallIgnored
+			cannotDeleteParent.setWritable(true);
+			//noinspection ResultOfMethodCallIgnored
+			cannotDeleteChild.setWritable(true);
+		}
+	}
+
+	@Test
+	public void testDeleteDirectory() throws Exception {
+
+		// deleting a non-existent file should not cause an error
+
+		File doesNotExist = new File(tmp.newFolder(), "abc");
+		FileUtils.deleteDirectory(doesNotExist);
+
+		// deleting a write protected file should throw an error
+
+		File cannotDeleteParent = tmp.newFolder();
+		File cannotDeleteChild = new File(cannotDeleteParent, "child");
+
+		try {
+			assumeTrue(cannotDeleteChild.createNewFile());
+			assumeTrue(cannotDeleteParent.setWritable(false));
+			assumeTrue(cannotDeleteChild.setWritable(false));
+
+			FileUtils.deleteDirectory(cannotDeleteParent);
+			fail("this should fail with an exception");
+		}
+		catch (AccessDeniedException ignored) {
+			// this is expected
+		}
+		finally {
+			//noinspection ResultOfMethodCallIgnored
+			cannotDeleteParent.setWritable(true);
+			//noinspection ResultOfMethodCallIgnored
+			cannotDeleteChild.setWritable(true);
+		}
+	}
+
+	@Test
+	public void testDeleteDirectoryConcurrently() throws Exception {
+		final File parent = tmp.newFolder();
+
+		generateRandomDirs(parent, 20, 5, 3);
+
+		// start three concurrent threads that delete the contents
+		CheckedThread t1 = new Deleter(parent);
+		CheckedThread t2 = new Deleter(parent);
+		CheckedThread t3 = new Deleter(parent);
+		t1.start();
+		t2.start();
+		t3.start();
+		t1.sync();
+		t2.sync();
+		t3.sync();
+
+		// assert is empty
+		assertFalse(parent.exists());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static void generateRandomDirs(File dir, int numFiles, int numDirs, int depth) throws IOException {
+		// generate the random files
+		for (int i = 0; i < numFiles; i++) {
+			File file = new File(dir, new AbstractID().toString());
+			try (FileOutputStream out = new FileOutputStream(file)) {
+				out.write(1);
+			}
+		}
+
+		if (depth > 0) {
+			// generate the directories
+			for (int i = 0; i < numDirs; i++) {
+				File subdir = new File(dir, new AbstractID().toString());
+				assertTrue(subdir.mkdir());
+				generateRandomDirs(subdir, numFiles, numDirs, depth - 1);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class Deleter extends CheckedThread {
+
+		private final File target;
+
+		Deleter(File target) {
+			this.target = target;
+		}
+
+		@Override
+		public void go() throws Exception {
+			FileUtils.deleteDirectory(target);
+		}
+	}
+}


[3/9] flink git commit: [FLINK-4450] [storm compat] Update storm version to 1.0

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
index 99c2583..10f9797 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.topology.IRichSpout;
+import org.apache.storm.topology.IRichSpout;
 
 /**
  * This interface represents a spout that emits a finite number of records. Common spouts emit infinite streams by

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
index 23d9d70..20e3309 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java
@@ -19,10 +19,10 @@ package org.apache.flink.storm.util;
 
 import java.util.Map;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 
 /**
  * {@link NullTerminatingSpout} in a finite spout (ie, implements {@link FiniteSpout} interface) that wraps an

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
index b79cc4e..9e222ec 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java
@@ -19,8 +19,8 @@ package org.apache.flink.storm.util;
 
 import java.util.List;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.utils.Utils;
 
 /**
  * Observes if a call to any {@code emit(...)} or {@code emitDirect(...)} method is made.

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
index 38ce58c..040c395 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
@@ -1,122 +1,122 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.storm.util;
-
-import backtype.storm.Config;
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
- * object) for embedded Spouts and Bolts.
- */
-@SuppressWarnings("rawtypes")
-public final class StormConfig extends GlobalJobParameters implements Map {
-	private static final long serialVersionUID = 8019519109673698490L;
-
-	/** Contains the actual configuration that is provided to Spouts and Bolts. */
-	private final Map config = new HashMap();
-
-	/**
-	 * Creates an empty configuration.
-	 */
-	public StormConfig() {
-	}
-
-	/**
-	 * Creates an configuration with initial values provided by the given {@code Map}.
-	 * 
-	 * @param config
-	 *            Initial values for this configuration.
-	 */
-	@SuppressWarnings("unchecked")
-	public StormConfig(Map config) {
-		this.config.putAll(config);
-	}
-
-
-	@Override
-	public int size() {
-		return this.config.size();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return this.config.isEmpty();
-	}
-
-	@Override
-	public boolean containsKey(Object key) {
-		return this.config.containsKey(key);
-	}
-
-	@Override
-	public boolean containsValue(Object value) {
-		return this.config.containsValue(value);
-	}
-
-	@Override
-	public Object get(Object key) {
-		return this.config.get(key);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Object put(Object key, Object value) {
-		return this.config.put(key, value);
-	}
-
-	@Override
-	public Object remove(Object key) {
-		return this.config.remove(key);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void putAll(Map m) {
-		this.config.putAll(m);
-	}
-
-	@Override
-	public void clear() {
-		this.config.clear();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Set<Object> keySet() {
-		return this.config.keySet();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Collection<Object> values() {
-		return this.config.values();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
-		return this.config.entrySet();
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.storm.Config;
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
+ * object) for embedded Spouts and Bolts.
+ */
+@SuppressWarnings("rawtypes")
+public final class StormConfig extends GlobalJobParameters implements Map {
+	private static final long serialVersionUID = 8019519109673698490L;
+
+	/** Contains the actual configuration that is provided to Spouts and Bolts. */
+	private final Map config = new HashMap();
+
+	/**
+	 * Creates an empty configuration.
+	 */
+	public StormConfig() {
+	}
+
+	/**
+	 * Creates an configuration with initial values provided by the given {@code Map}.
+	 * 
+	 * @param config
+	 *            Initial values for this configuration.
+	 */
+	@SuppressWarnings("unchecked")
+	public StormConfig(Map config) {
+		this.config.putAll(config);
+	}
+
+
+	@Override
+	public int size() {
+		return this.config.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.config.isEmpty();
+	}
+
+	@Override
+	public boolean containsKey(Object key) {
+		return this.config.containsKey(key);
+	}
+
+	@Override
+	public boolean containsValue(Object value) {
+		return this.config.containsValue(value);
+	}
+
+	@Override
+	public Object get(Object key) {
+		return this.config.get(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Object put(Object key, Object value) {
+		return this.config.put(key, value);
+	}
+
+	@Override
+	public Object remove(Object key) {
+		return this.config.remove(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void putAll(Map m) {
+		this.config.putAll(m);
+	}
+
+	@Override
+	public void clear() {
+		this.config.clear();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<Object> keySet() {
+		return this.config.keySet();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Collection<Object> values() {
+		return this.config.values();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+		return this.config.entrySet();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
index 2196a1c..7b94707 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
@@ -88,4 +88,8 @@ class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputC
 	@Override
 	public void fail(final Tuple input) {}
 
+	@Override
+	public void resetTimeout(Tuple var1) {}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 55a8e28..731f28f 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -17,15 +17,15 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.utils.Utils;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index 52d39a7..f55f0e3 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.ICombiner;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IReducer;
+import org.apache.storm.metric.api.ReducedMetric;
+import org.apache.storm.state.ISubscribedState;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
 import clojure.lang.Atom;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
index 7a3b6d5..6dd6973 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichBolt;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
index daf9252..d927f0e 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 
 import java.util.HashMap;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
index 0e2190e..5404027 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.spout.ISpoutOutputCollector;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
@@ -79,4 +79,9 @@ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutO
 		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 	}
 
+	public long getPendingCount() {
+		return 0;
+	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index c171ccc..3dd1e10 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.common.functions.StoppableFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
index febf0f3..30085fc 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
@@ -19,16 +19,16 @@ package org.apache.flink.storm.wrappers;
 
 /*
  * We do neither import
- * 		backtype.storm.tuple.Tuple;
+ * 		org.apache.storm.tuple.Tuple;
  * nor
  * 		org.apache.flink.api.java.tuple.Tuple
  * to avoid confusion
  */
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Values;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -37,7 +37,7 @@ import java.util.List;
 /**
  * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
  */
-public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
+public class StormTuple<IN> implements org.apache.storm.tuple.Tuple {
 
 	/** The Storm representation of the original Flink tuple. */
 	private final Values stormTuple;
@@ -55,7 +55,7 @@ public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 
 	/**
 	 * Create a new Storm tuple from the given Flink tuple.
-	 * 
+	 *
 	 * @param flinkTuple
 	 *            The Flink tuple to be converted.
 	 * @param schema
@@ -389,4 +389,10 @@ public class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 		return "StormTuple{ " + stormTuple.toString() + "[" + this.producerComponentId + ","
 				+ this.producerStreamId + "," + this.producerTaskId + "," + this.messageId + "]}";
 	}
+
+	@Override
+	public GlobalStreamId getSourceGlobalStreamId() {
+		return new GlobalStreamId(this.producerComponentId, this.producerStreamId);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
index 74a12dd..3a9b650 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -16,18 +16,18 @@
  */
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.tuple.Fields;
 import clojure.lang.Atom;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
index 90a82ba..ddbeaff 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
index 39b01d8..0ec0179 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
index 6077534..0f617fb 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
@@ -16,11 +16,11 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
index 846ae51..1b185a7 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
@@ -16,10 +16,10 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
index 1b320e5..9a5b1cd 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
@@ -17,12 +17,12 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
index da2021c..1eaed4a 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.storm.util;
 import java.util.HashMap;
 import java.util.Map;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
index 0e3784a..a5b96bd 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.storm.spout.SpoutOutputCollector;
 
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
index 0fc7df9..2ad8f2e 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
index 7fe8df7..82506e4 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
@@ -16,13 +16,13 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
index c11597c..1f4da55 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
@@ -16,11 +16,11 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.LinkedList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
index e8748d0..9e3165b 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.tuple.Values;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.storm.util.AbstractTest;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 1440b51..1f8f773 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -338,7 +338,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 		int counter = 0;
 		@Override
-		public void execute(backtype.storm.tuple.Tuple input) {
+		public void execute(org.apache.storm.tuple.Tuple input) {
 			if (++counter % 2 == 1) {
 				this.collector.emit("stream1", new Values(input.getInteger(0)));
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
index de9be2a..9a23b0f 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.api.ICombiner;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IReducer;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
index 481cb5c..94a88fe 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
index fac2582..eb91c63 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.tuple.Values;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.storm.util.AbstractTest;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index dc84b33..265e705 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
index eba611e..7ea4b76 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple5;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index b4e153a..5e29ac4 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -17,17 +17,17 @@
 
 package org.apache.flink.storm.wrappers;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.util.AbstractTest;
@@ -186,15 +186,15 @@ public class WrapperSetupHelperTest extends AbstractTest {
 				.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
 				.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
 
-		LocalCluster cluster = new LocalCluster();
-		Config c = new Config();
-		c.setNumAckers(0);
-		cluster.submitTopology("test", c, builder.createTopology());
-
-		while (TestSink.result.size() != 8) {
-			Utils.sleep(100);
-		}
-		cluster.shutdown();
+//		LocalCluster cluster = new LocalCluster();
+//		Config c = new Config();
+//		c.setNumAckers(0);
+//		cluster.submitTopology("test", c, builder.createTopology());
+//
+//		while (TestSink.result.size() != 8) {
+//			Utils.sleep(100);
+//		}
+//		cluster.shutdown();
 
 		final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
 		StormTopology stormTopology = flinkBuilder.getStormTopology();


[9/9] flink git commit: [FLINK-4959] [docs] Add documentation for ProcessFunction

Posted by se...@apache.org.
[FLINK-4959] [docs] Add documentation for ProcessFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a339a65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a339a65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a339a65

Branch: refs/heads/master
Commit: 7a339a65f13bfccec1f374e035d557290b45bd01
Parents: fdce1f3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 16 20:17:13 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 21:10:20 2017 +0100

----------------------------------------------------------------------
 docs/concepts/programming-model.md  |  60 ++++++--
 docs/concepts/runtime.md            |  10 +-
 docs/dev/stream/process_function.md | 230 +++++++++++++++++++++++++++++++
 docs/fig/levels_of_abstraction.svg  | 193 ++++++++++++++++++++++++++
 4 files changed, 474 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a339a65/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md b/docs/concepts/programming-model.md
index 5ab6b8f..3d2aebb 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -27,11 +27,47 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+## Levels of Abstraction
+
+Flink offers different levels of abstraction to develop streaming/batch applications.
+
+<img src="../fig/levels_of_abstraction.svg" alt="Programming levels of abstraction" class="offset" width="80%" />
+
+  - The lowest level abstraction simply offers **stateful streaming**. It is embedded into the [DataStream API](../dev/datastream_api.html)
+    via the [Process Function](../dev/stream/process_function.html). It allows users freely process events from one or more streams,
+    and use consistent fault tolerant *state*. In addition, users can register event time and processing time callbacks,
+    allowing programs to realize sophisticated computations.
+
+  - In practice, most applications would not need the above described low level abstraction, but would instead program against the
+    **Core APIs** like the [DataStream API](../dev/datastream_api.html) (bounded/unbounded streams) and the [DataSet API](../dev/batch/index.html)
+    (bounded data sets). These fluent APIs offer the common building blocks for data processing, like various forms of user-specified
+    transformations, joins, aggregations, windows, state, etc. Data types processed in these APIs are represented as classes
+    in the respective programming languages.
+
+    The low level *Process Function* integrates with the *DataStream API*, making it possible to go the lower level abstraction 
+    for certain operations only. The *DataSet API* offers additional primitives on bounded data sets, like loops/iterations.
+
+  - The **Table API** is a declarative DSL centered around *tables*, which may be dynamically changing tables (when representing streams).
+    The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases)
+    and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc.
+    Table API programs declaratively define *what logical operation should be done* rather than specifying exactly
+   *how the code for the operation looks*. Though the Table API is extensible by various types of user-defined
+    functions, it is less expressive than the *Core APIs*, but more concise to use (less code to write).
+    In addition, Table API programs also go through an optimizer that applies optimization rules before execution.
+
+    One can seamlessly convert between tables and *DataStream*/*DataSet*, allowing programs to mix *Table API* and with the *DataStream*
+    and *DataSet* APIs.
+
+  - The highest level abstraction offered by Flink is **SQL**. This abstraction is similar to the *Table API* both in semantics and
+    expressiveness, but represents programs as SQL query expressions.
+    The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the *Table API*.
+
+
 ## Programs and Dataflows
 
 The basic building blocks of Flink programs are **streams** and **transformations**. (Note that the
-DataSets used in Flink's batch API are also streams internally -- more about that
-later.) Conceptually a *stream* is a never-ending flow of data records, and a *transformation* is an
+DataSets used in Flink's DataSet API are also streams internally -- more about that
+later.) Conceptually a *stream* is a (potentially never-ending) flow of data records, and a *transformation* is an
 operation that takes one or more streams as input, and produces one or more output streams as a
 result.
 
@@ -40,7 +76,7 @@ Each dataflow starts with one or more **sources** and ends in one or more **sink
 arbitrary **directed acyclic graphs** *(DAGs)*. Although special forms of cycles are permitted via
 *iteration* constructs, for the most part we will gloss over this for simplicity.
 
-<img src="{{ site.baseurl }}/fig/program_dataflow.svg" alt="A DataStream program, and its dataflow." class="offset" width="80%" />
+<img src="../fig/program_dataflow.svg" alt="A DataStream program, and its dataflow." class="offset" width="80%" />
 
 Often there is a one-to-one correspondence between the transformations in the programs and the operators
 in the dataflow. Sometimes, however, one transformation may consist of multiple transformation operators.
@@ -49,19 +85,15 @@ in the dataflow. Sometimes, however, one transformation may consist of multiple
 
 ## Parallel Dataflows
 
-Programs in Flink are inherently parallel and distributed. This parallelism is expressed in Flink's
-DataStream API with the *keyBy()* operator, which can be thought of as a declaration that the stream can
-be operated on in parallel for different values of the key.
-
-*Streams* are split into **stream partitions**, and *operators* are split into **operator
-subtasks**. The operator subtasks are independent of one another, and execute in different threads
+Programs in Flink are inherently parallel and distributed. During execution, a *stream* has one or more **stream partitions**,
+and each *operator* has one or **operator subtasks**. The operator subtasks are independent of one another, and execute in different threads
 and possibly on different machines or containers.
 
 The number of operator subtasks is the **parallelism** of that particular operator. The parallelism of a stream
 is always that of its producing operator. Different operators of the same program may have different
 levels of parallelism.
 
-<img src="{{ site.baseurl }}/fig/parallel_dataflow.svg" alt="A parallel dataflow" class="offset" width="80%" />
+<img src="../fig/parallel_dataflow.svg" alt="A parallel dataflow" class="offset" width="80%" />
 
 Streams can transport data between two operators in a *one-to-one* (or *forwarding*) pattern, or in a *redistributing* pattern:
 
@@ -93,7 +125,7 @@ Windows can be *time driven* (example: every 30 seconds) or *data driven* (examp
 One typically distinguishes different types of windows, such as *tumbling windows* (no overlap),
 *sliding windows* (with overlap), and *session windows* (punctuated by a gap of inactivity).
 
-<img src="{{ site.baseurl }}/fig/windows.svg" alt="Time- and Count Windows" class="offset" width="80%" />
+<img src="../fig/windows.svg" alt="Time- and Count Windows" class="offset" width="80%" />
 
 More window examples can be found in this [blog post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html).
 
@@ -112,7 +144,7 @@ of time:
 
   - **Processing Time** is the local time at each operator that performs a time-based operation.
 
-<img src="{{ site.baseurl }}/fig/event_ingestion_processing_time.svg" alt="Event Time, Ingestion Time, and Processing Time" class="offset" width="80%" />
+<img src="../fig/event_ingestion_processing_time.svg" alt="Event Time, Ingestion Time, and Processing Time" class="offset" width="80%" />
 
 More details on how to handle time are in the [event time docs]({{ site.baseurl }}/dev/event_time.html).
 
@@ -131,7 +163,7 @@ and is restricted to the values associated with the current event's key. Alignin
 makes sure that all state updates are local operations, guaranteeing consistency without transaction overhead.
 This alignment also allows Flink to redistribute the state and adjust the stream partitioning transparently.
 
-<img src="{{ site.baseurl }}/fig/state_partitioning.svg" alt="State and Partitioning" class="offset" width="50%" />
+<img src="../fig/state_partitioning.svg" alt="State and Partitioning" class="offset" width="50%" />
 
 {% top %}
 
@@ -170,4 +202,4 @@ same way as well as they apply to streaming programs, with minor exceptions:
 
 ## Next Steps
 
-Continue with the basic concepts in Flink's [Distributed Runtime]({{ site.baseurl }}/concepts/runtime).
+Continue with the basic concepts in Flink's [Distributed Runtime](runtime.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/7a339a65/docs/concepts/runtime.md
----------------------------------------------------------------------
diff --git a/docs/concepts/runtime.md b/docs/concepts/runtime.md
index 016861a..0d4e017 100644
--- a/docs/concepts/runtime.md
+++ b/docs/concepts/runtime.md
@@ -35,7 +35,7 @@ The chaining behavior can be configured in the APIs.
 
 The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads.
 
-<img src="{{ site.baseurl }}/fig/tasks_chains.svg" alt="Operator chaining into Tasks" class="offset" width="80%" />
+<img src="../fig/tasks_chains.svg" alt="Operator chaining into Tasks" class="offset" width="80%" />
 
 {% top %}
 
@@ -62,7 +62,7 @@ The **client** is not part of the runtime and program execution, but is used to
 After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the
 Java/Scala program that triggers the execution, or in the command line process `./bin/flink run ...`.
 
-<img src="{{ site.baseurl }}/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="80%" />
+<img src="../fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="80%" />
 
 {% top %}
 
@@ -82,7 +82,7 @@ separate container, for example). Having multiple slots
 means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and
 heartbeat messages. They may also share data sets and data structures, thus reducing the per-task overhead.
 
-<img src="{{ site.baseurl }}/fig/tasks_slots.svg" alt="A TaskManager with Task Slots and Tasks" class="offset" width="80%" />
+<img src="../fig/tasks_slots.svg" alt="A TaskManager with Task Slots and Tasks" class="offset" width="80%" />
 
 By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as
 they are from the same job. The result is that one slot may hold an entire pipeline of the
@@ -96,7 +96,7 @@ job. Allowing this *slot sharing* has two main benefits:
     With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the
     slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers.
 
-<img src="{{ site.baseurl }}/fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />
+<img src="../fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />
 
 The APIs also include a *resource group* mechanism which can be used to prevent undesirable slot sharing. 
 
@@ -112,7 +112,7 @@ stores data in an in-memory hash map, another state backend uses [RocksDB](http:
 In addition to defining the data structure that holds the state, the state backends also implement the logic to
 take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint.
 
-<img src="{{ site.baseurl }}/fig/checkpoints.svg" alt="checkpoints and snapshots" class="offset" width="60%" />
+<img src="../fig/checkpoints.svg" alt="checkpoints and snapshots" class="offset" width="60%" />
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a339a65/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
new file mode 100644
index 0000000..a8da4a2
--- /dev/null
+++ b/docs/dev/stream/process_function.md
@@ -0,0 +1,230 @@
+---
+title: "Process Function (Low-level Operations)"
+nav-title: "Process Function"
+nav-parent_id: streaming
+nav-pos: 35
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## The ProcessFunction
+
+The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
+all (acyclic) streaming applications:
+
+  - events (stream elements)
+  - state (fault tolerant, consistent)
+  - timers (event time and processing time)
+
+The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
+be being invoked for each event received in the input stream(s).
+
+For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
+`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
+the `ProcessFunction` needs to be applied onto a `KeyedStream`:
+```java
+stream.keyBy("id").process(new MyProcessFunction())
+```
+
+The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
+Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
+event time timestamp, and the *TimerService*. The `TimerService` can be used to register callbacks for future
+event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
+called. During that call, all states are again scoped to the key with which the timer was created, allowing
+timers to perform keyed state manipulation as well.
+
+
+## Low-level Joins
+
+To realize low-level operations on two inputs, applications can use the `CoProcessFunction`. It relates to the `ProcessFunction`
+in the same way as a `CoFlatMapFunction` relates to the `FlatMapFunction`: The function is typed to two different inputs and
+gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
+
+Implementing a low level join follows typically the pattern:
+
+  - Create a state object for one input (or both)
+  - Update the state upon receiving elements from its input
+  - Upon receiving elements from the other input, probe the state and produce the joined result
+
+
+## Example
+
+The following example maintains counts per key, and emits the key/count pair if no update happened to the key for one minute
+(in event time):
+
+  - The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key.
+  - For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp
+  - The function also schedules a callback one minute into the future (in event time)
+  - Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
+    and emits the key/count if the match (no further update happened in that minute)
+
+*Note:* This simple example could also have been implemented on top of session windows, we simple use it to illustrate
+the basic pattern of how to use the `ProcessFunction`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
+import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
+import org.apache.flink.util.Collector;
+
+
+// the source data stream
+DataStream<Tuple2<String, String>> stream = ...;
+
+// apply the process function onto a keyed stream
+DataStream<Tuple2<String, Long>> result = stream
+    .keyBy(0)
+    .process(new CountWithTimeoutFunction());
+
+/**
+ * The data type stored in the state
+ */
+public class CountWithTimestamp {
+
+    public String key;
+    public long count;
+    public long lastModified;
+}
+
+/**
+ * The implementation of the ProcessFunction that maintains the count and timeouts
+ */
+public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
+
+    /** The state that is maintained by this process function */
+    private ValueState<CountWithTimestamp> state;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
+    }
+
+    @Override
+    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
+            throws Exception {
+
+        // retrieve the current count
+        CountWithTimestamp current = state.value();
+        if (current == null) {
+            current = new CountWithTimestamp();
+            current.key = value.f0;
+        }
+
+        // update the state's count
+        current.count++;
+
+        // set the state's timestamp to the record's assigned event time timestamp
+        current.lastModified = ctx.timestamp();
+
+        // write the state back
+        state.update(current);
+
+        // schedule the next timer 60 seconds from the current event time
+        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
+            throws Exception {
+
+        // get the state for the key that scheduled the timer
+        CountWithTimestamp result = state.value();
+
+        // check if this is an outdated timer or the latest timer
+        if (timestamp == result.lastModified) {
+            // emit the state
+            out.collect(new Tuple2<String, Long>(result.key, result.count));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
+import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
+import org.apache.flink.util.Collector;
+
+// the source data stream
+DataStream<Tuple2<String, String>> stream = ...;
+
+// apply the process function onto a keyed stream
+DataStream<Tuple2<String, Long>> result = stream
+    .keyBy(0)
+    .process(new CountWithTimeoutFunction());
+
+/**
+ * The data type stored in the state
+ */
+case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
+
+/**
+ * The implementation of the ProcessFunction that maintains the count and timeouts
+ */
+class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
+
+  /** The state that is maintained by this process function */
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
+      .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
+
+
+  override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
+    // initialize or retrieve/update the state
+
+    val current: CountWithTimestamp = state.value match {
+      case null => 
+        CountWithTimestamp(key, 1, ctx.timestamp)
+      case CountWithTimestamp(key, count, time) =>
+        CountWithTimestamp(key, count + 1, ctx.timestamp)
+    }
+
+    // write the state back
+    state.update(current)
+
+    // schedule the next timer 60 seconds from the current event time
+    ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
+  }
+
+  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
+    state.value match {
+      case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) => 
+        out.collect((key, count))
+      case _ =>
+    }
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a339a65/docs/fig/levels_of_abstraction.svg
----------------------------------------------------------------------
diff --git a/docs/fig/levels_of_abstraction.svg b/docs/fig/levels_of_abstraction.svg
new file mode 100644
index 0000000..8f04a31
--- /dev/null
+++ b/docs/fig/levels_of_abstraction.svg
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<svg
+   xmlns:dc="http://purl.org/dc/elements/1.1/"
+   xmlns:cc="http://creativecommons.org/ns#"
+   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+   xmlns:svg="http://www.w3.org/2000/svg"
+   xmlns="http://www.w3.org/2000/svg"
+   version="1.1"
+   width="974.0144"
+   height="409.9375"
+   id="svg2">
+  <defs
+     id="defs4" />
+  <metadata
+     id="metadata7">
+    <rdf:RDF>
+      <cc:Work
+         rdf:about="">
+        <dc:format>image/svg+xml</dc:format>
+        <dc:type
+           rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
+        <dc:title></dc:title>
+      </cc:Work>
+    </rdf:RDF>
+  </metadata>
+  <g
+     transform="translate(258.42828,-167.38041)"
+     id="layer1">
+    <g
+       transform="translate(-323.70953,144.47416)"
+       id="g2989">
+      <path
+         d="m 66.203993,358.32677 0,73.59333 621.867427,0 0,-73.59333 -621.867427,0 z"
+         id="path2991"
+         style="fill:#e4eaf4;fill-opacity:1;fill-rule:evenodd;stroke:none" />
+      <path
+         d="m 66.203993,358.32677 621.867427,0 0,73.59333 -621.867427,0 z"
+         id="path2993"
+         style="fill:none;stroke:#898c92;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
+      <text
+         x="164.98396"
+         y="408.29218"
+         id="text2995"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Stateful</text>
+      <text
+         x="293.41599"
+         y="408.29218"
+         id="text2997"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Stream Processing</text>
+      <path
+         d="m 181.69526,246.88651 0,73.59333 506.37616,0 0,-73.59333 -506.37616,0 z"
+         id="path2999"
+         style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:none" />
+      <path
+         d="m 181.69526,246.88651 506.37616,0 0,73.59333 -506.37616,0 z"
+         id="path3001"
+         style="fill:none;stroke:#935f1c;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
+      <text
+         x="231.63388"
+         y="296.79422"
+         id="text3003"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">DataStream </text>
+      <text
+         x="428.33289"
+         y="296.79422"
+         id="text3005"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">/ </text>
+      <text
+         x="447.83777"
+         y="296.79422"
+         id="text3007"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">DataSet</text>
+      <text
+         x="582.12122"
+         y="296.79422"
+         id="text3009"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">API</text>
+      <path
+         d="m 288.93448,135.44624 0,73.4433 399.13694,0 0,-73.4433 -399.13694,0 z"
+         id="path3011"
+         style="fill:#be73f1;fill-opacity:1;fill-rule:evenodd;stroke:none" />
+      <path
+         d="m 288.93448,135.44624 399.13694,0 0,73.4433 -399.13694,0 z"
+         id="path3013"
+         style="fill:none;stroke:#724591;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
+      <text
+         x="414.60895"
+         y="185.29616"
+         id="text3015"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Table API</text>
+      <path
+         d="m 415.0409,23.855943 0,73.593334 273.03052,0 0,-73.593334 -273.03052,0 z"
+         id="path3017"
+         style="fill:#e6526e;fill-opacity:1;fill-rule:evenodd;stroke:none" />
+      <path
+         d="m 415.0409,23.855943 273.03052,0 0,73.593334 -273.03052,0 z"
+         id="path3019"
+         style="fill:none;stroke:#8a3142;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" />
+      <text
+         x="516.66846"
+         y="73.79821"
+         id="text3021"
+         xml:space="preserve"
+         style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">SQL</text>
+      <text
+         x="722.66699"
+         y="292.85269"
+         id="text3023"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Core </text>
+      <text
+         x="782.38184"
+         y="292.85269"
+         id="text3025"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">APIs</text>
+      <text
+         x="722.66699"
+         y="181.35474"
+         id="text3027"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Declarative DSL</text>
+      <text
+         x="722.66699"
+         y="69.856773"
+         id="text3029"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">High</text>
+      <text
+         x="774.27985"
+         y="69.856773"
+         id="text3031"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">-</text>
+      <text
+         x="782.68195"
+         y="69.856773"
+         id="text3033"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">level Language</text>
+      <text
+         x="722.66699"
+         y="389.2005"
+         id="text3035"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Low</text>
+      <text
+         x="768.72845"
+         y="389.2005"
+         id="text3037"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">-</text>
+      <text
+         x="777.13055"
+         y="389.2005"
+         id="text3039"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">level building block</text>
+      <text
+         x="722.66699"
+         y="419.20798"
+         id="text3041"
+         xml:space="preserve"
+         style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">(streams, state, [event] time)</text>
+    </g>
+  </g>
+</svg>


[4/9] flink git commit: [FLINK-4450] [storm compat] Update storm version to 1.0

Posted by se...@apache.org.
[FLINK-4450] [storm compat] Update storm version to 1.0

This closes #3037


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/475c0b1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/475c0b1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/475c0b1a

Branch: refs/heads/master
Commit: 475c0b1a6c74744e3431b268bc1a2ee764052cf1
Parents: ef8cdfe
Author: yuzhongliu <yu...@tencent.com>
Authored: Thu Dec 22 11:36:37 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:47 2017 +0100

----------------------------------------------------------------------
 flink-contrib/flink-storm-examples/pom.xml      |  88 ++++---
 .../storm/exclamation/ExclamationLocal.java     |  10 +-
 .../storm/exclamation/ExclamationTopology.java  |   4 +-
 .../storm/exclamation/ExclamationWithBolt.java  |   4 +-
 .../storm/exclamation/ExclamationWithSpout.java |   4 +-
 .../exclamation/operators/ExclamationBolt.java  |  14 +-
 .../flink/storm/join/SingleJoinExample.java     |  14 +-
 .../flink/storm/print/PrintSampleStream.java    |  10 +-
 .../storm/split/operators/RandomSpout.java      |  12 +-
 .../split/operators/VerifyAndEnrichBolt.java    |  14 +-
 .../flink/storm/util/AbstractBoltSink.java      |  10 +-
 .../flink/storm/util/AbstractLineSpout.java     |  10 +-
 .../apache/flink/storm/util/BoltFileSink.java   |   2 +-
 .../apache/flink/storm/util/BoltPrintSink.java  |   2 +-
 .../org/apache/flink/storm/util/FileSpout.java  |   6 +-
 .../flink/storm/util/FiniteFileSpout.java       |   6 +-
 .../apache/flink/storm/util/InMemorySpout.java  |   2 +-
 .../flink/storm/util/OutputFormatter.java       |   2 +-
 .../flink/storm/util/SimpleOutputFormatter.java |   2 +-
 .../flink/storm/util/TupleOutputFormatter.java  |   2 +-
 .../storm/wordcount/BoltTokenizerWordCount.java |   2 +-
 .../wordcount/BoltTokenizerWordCountPojo.java   |   2 +-
 .../BoltTokenizerWordCountWithNames.java        |   4 +-
 .../storm/wordcount/SpoutSourceWordCount.java   |   4 +-
 .../flink/storm/wordcount/WordCountLocal.java   |   8 +-
 .../storm/wordcount/WordCountLocalByName.java   |   8 +-
 .../wordcount/WordCountRemoteByClient.java      |  16 +-
 .../wordcount/WordCountRemoteBySubmitter.java   |   8 +-
 .../storm/wordcount/WordCountTopology.java      |   6 +-
 .../storm/wordcount/operators/BoltCounter.java  |  14 +-
 .../wordcount/operators/BoltCounterByName.java  |  14 +-
 .../wordcount/operators/BoltTokenizer.java      |  14 +-
 .../operators/BoltTokenizerByName.java          |  14 +-
 .../wordcount/operators/WordCountFileSpout.java |   4 +-
 .../operators/WordCountInMemorySpout.java       |   4 +-
 .../org/apache/flink/storm/split/SplitBolt.java |  14 +-
 .../flink/storm/split/SplitBoltTopology.java    |   2 +-
 .../flink/storm/split/SplitSpoutTopology.java   |   2 +-
 .../flink/storm/split/SplitStreamBoltLocal.java |   4 +-
 .../storm/split/SplitStreamSpoutLocal.java      |   4 +-
 .../storm/tests/StormFieldsGroupingITCase.java  |   6 +-
 .../flink/storm/tests/StormMetaDataITCase.java  |   4 +-
 .../flink/storm/tests/StormUnionITCase.java     |   4 +-
 .../tests/operators/FiniteRandomSpout.java      |  14 +-
 .../flink/storm/tests/operators/MergerBolt.java |  12 +-
 .../storm/tests/operators/MetaDataSpout.java    |  12 +-
 .../flink/storm/tests/operators/TaskIdBolt.java |  14 +-
 .../tests/operators/VerifyMetaDataBolt.java     |  16 +-
 flink-contrib/flink-storm/pom.xml               |  59 +++--
 .../org/apache/flink/storm/api/FlinkClient.java |  20 +-
 .../flink/storm/api/FlinkLocalCluster.java      |  14 +-
 .../storm/api/FlinkOutputFieldsDeclarer.java    |   8 +-
 .../apache/flink/storm/api/FlinkSubmitter.java  |  12 +-
 .../apache/flink/storm/api/FlinkTopology.java   |  18 +-
 .../flink/storm/api/StormFlinkStreamMerger.java |   6 +-
 .../flink/storm/api/TwoFlinkStreamsMerger.java  |   6 +-
 .../apache/flink/storm/util/FiniteSpout.java    |   2 +-
 .../flink/storm/util/NullTerminatingSpout.java  |   8 +-
 .../util/SpoutOutputCollectorObserver.java      |   4 +-
 .../apache/flink/storm/util/StormConfig.java    | 244 +++++++++----------
 .../flink/storm/wrappers/BoltCollector.java     |   8 +-
 .../flink/storm/wrappers/BoltWrapper.java       |  18 +-
 .../storm/wrappers/FlinkTopologyContext.java    |  20 +-
 .../storm/wrappers/MergedInputsBoltWrapper.java |   2 +-
 .../wrappers/SetupOutputFieldsDeclarer.java     |   6 +-
 .../flink/storm/wrappers/SpoutCollector.java    |   7 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |   8 +-
 .../apache/flink/storm/wrappers/StormTuple.java |  20 +-
 .../storm/wrappers/WrapperSetupHelper.java      |  24 +-
 .../api/FlinkOutputFieldsDeclarerTest.java      |   4 +-
 .../flink/storm/api/FlinkTopologyTest.java      |   4 +-
 .../org/apache/flink/storm/api/TestBolt.java    |  10 +-
 .../org/apache/flink/storm/api/TestSpout.java   |   8 +-
 .../flink/storm/util/FiniteTestSpout.java       |  12 +-
 .../storm/util/NullTerminatingSpoutTest.java    |   8 +-
 .../util/SpoutOutputCollectorObserverTest.java  |   2 +-
 .../apache/flink/storm/util/TestDummyBolt.java  |  14 +-
 .../apache/flink/storm/util/TestDummySpout.java |  14 +-
 .../org/apache/flink/storm/util/TestSink.java   |  10 +-
 .../flink/storm/wrappers/BoltCollectorTest.java |   2 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   |  18 +-
 .../wrappers/FlinkTopologyContextTest.java      |  14 +-
 .../wrappers/SetupOutputFieldsDeclarerTest.java |   4 +-
 .../storm/wrappers/SpoutCollectorTest.java      |   2 +-
 .../flink/storm/wrappers/SpoutWrapperTest.java  |   8 +-
 .../flink/storm/wrappers/StormTupleTest.java    |   8 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  |  40 +--
 87 files changed, 598 insertions(+), 546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index a200bb0..b9da214 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -18,7 +18,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
 	<modelVersion>4.0.0</modelVersion>
 
@@ -40,12 +40,16 @@ under the License.
 		<repository>
 			<id>clojars</id>
 			<url>https://clojars.org/repo/</url>
-			<releases><enabled>true</enabled></releases>
-			<snapshots><enabled>false</enabled></snapshots>
+			<releases>
+				<enabled>true</enabled>
+			</releases>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
 		</repository>
 	</repositories>
 
-	
+
 	<dependencies>
 
 		<!-- core dependencies -->
@@ -71,8 +75,8 @@ under the License.
 		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-starter</artifactId>
-			<version>0.9.4</version>
-			
+			<version>1.0.0</version>
+
 			<!-- remove storm dependency - it should be drawn only (with proper
 				customization) via the 'flink-storm' dependency -->
 			<exclusions>
@@ -125,7 +129,8 @@ under the License.
 									<type>jar</type>
 									<overWrite>false</overWrite>
 									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class
+									</includes>
 								</artifactItem>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
@@ -138,7 +143,7 @@ under the License.
 								<artifactItem>
 									<groupId>org.apache.storm</groupId>
 									<artifactId>storm-core</artifactId>
-									<version>0.9.4</version>
+									<version>1.0.0</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>
 									<outputDirectory>${project.build.directory}/classes</outputDirectory>
@@ -197,15 +202,16 @@ under the License.
 
 							<includes>
 								<!-- from storm-core -->
-								<include>backtype/storm/topology/*.class</include>
-								<include>backtype/storm/spout/*.class</include>
-								<include>backtype/storm/task/*.class</include>
-								<include>backtype/storm/tuple/*.class</include>
-								<include>backtype/storm/generated/*.class</include>
-								<include>backtype/storm/metric/**/*.class</include>
-								<include>org/apache/thrift7/**/*.class</include>
+								<include>org/apache/storm/topology/*.class</include>
+								<include>org/apache/storm/spout/*.class</include>
+								<include>org/apache/storm/task/*.class</include>
+								<include>org/apache/storm/tuple/*.class</include>
+								<include>org/apache/storm/generated/*.class</include>
+								<include>org/apache/storm/metric/**/*.class</include>
+								<include>org/apache/storm/thrift/**/*.class</include>
 								<!-- Storm's recursive dependencies -->
 								<include>org/json/simple/**/*.class</include>
+								<include>org/apache/storm/shade/**/*.class</include>
 								<!-- compatibility layer -->
 								<include>org/apache/flink/storm/api/*.class</include>
 								<include>org/apache/flink/storm/util/*.class</include>
@@ -214,7 +220,8 @@ under the License.
 								<include>org/apache/flink/storm/wordcount/SpoutSourceWordCount.class</include>
 								<include>org/apache/flink/storm/wordcount/SpoutSourceWordCount$*.class</include>
 								<include>org/apache/flink/storm/wordcount/operators/WordCountFileSpout.class</include>
-								<include>org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.class</include>
+								<include>org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.class
+								</include>
 								<include>org/apache/flink/storm/util/AbstractLineSpout.class</include>
 								<include>org/apache/flink/storm/util/FileSpout.class</include>
 								<include>org/apache/flink/storm/util/InMemorySpout.class</include>
@@ -237,21 +244,23 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.storm.wordcount.BoltTokenizerWordCount</program-class>
+									<program-class>org.apache.flink.storm.wordcount.BoltTokenizerWordCount
+									</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
 								<!-- from storm-core -->
-								<include>backtype/storm/topology/*.class</include>
-								<include>backtype/storm/spout/*.class</include>
-								<include>backtype/storm/task/*.class</include>
-								<include>backtype/storm/tuple/*.class</include>
-								<include>backtype/storm/generated/*.class</include>
-								<include>backtype/storm/metric/**/*.class</include>
-								<include>org/apache/thrift7/**/*.class</include>
+								<include>org/apache/storm/topology/*.class</include>
+								<include>org/apache/storm/spout/*.class</include>
+								<include>org/apache/storm/task/*.class</include>
+								<include>org/apache/storm/tuple/*.class</include>
+								<include>org/apache/storm/generated/*.class</include>
+								<include>org/apache/storm/metric/**/*.class</include>
+								<include>org/apache/storm/thrift/**/*.class</include>
 								<!-- Storm's recursive dependencies -->
 								<include>org/json/simple/**/*.class</include>
+								<include>org/apache/storm/shade/**/*.class</include>
 								<!-- compatibility layer -->
 								<include>org/apache/flink/storm/api/*.class</include>
 								<include>org/apache/flink/storm/util/*.class</include>
@@ -322,30 +331,34 @@ under the License.
 									<artifact>org.apache.storm:storm-core</artifact>
 									<includes>
 										<include>defaults.yaml</include>
-										<include>backtype/storm/*.class</include>
-										<include>backtype/storm/topology/*.class</include>
-										<include>backtype/storm/spout/*.class</include>
-										<include>backtype/storm/task/*.class</include>
-										<include>backtype/storm/tuple/*.class</include>
-										<include>backtype/storm/generated/*.class</include>
-										<include>backtype/storm/metric/**/*.class</include>
-										<include>backtype/storm/utils/*.class</include>
-										<include>backtype/storm/serialization/*.class</include>
+										<include>org/apache/storm/*.class</include>
+										<include>org/apache/storm/topology/*.class</include>
+										<include>org/apache/storm/spout/*.class</include>
+										<include>org/apache/storm/task/*.class</include>
+										<include>org/apache/storm/tuple/*.class</include>
+										<include>org/apache/storm/generated/*.class</include>
+										<include>org/apache/storm/metric/**/*.class</include>
+										<include>org/apache/storm/utils/*.class</include>
+										<include>org/apache/storm/serialization/*.class</include>
 										<include>org/apache/storm/curator/**/*.class</include>
-										<include>org/apache/thrift7/**/*.class</include>
+										<include>org/apache/storm/grouping/**/*.class</include>
+										<include>org/apache/storm/thrift/**/*.class</include>
 										<!-- Storm's recursive dependencies -->
 										<include>org/json/simple/**/*.class</include>
 										<include>org/yaml/snakeyaml/**/*.class</include>
+										<include>org/apache/storm/shade/**/*.class</include>
 									</includes>
 								</filter>
 								<filter>
 									<artifact>org.apache.flink:flink-storm-examples_2.10</artifact>
 									<includes>
-										<include>org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.class</include>
+										<include>org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.class
+										</include>
 										<include>org/apache/flink/storm/wordcount/WordCountTopology.class</include>
 										<include>org/apache/flink/storm/wordcount/operators/*.class</include>
 										<include>org/apache/flink/storm/util/*.class</include>
-										<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+										<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class
+										</include>
 									</includes>
 								</filter>
 								<filter>
@@ -358,7 +371,8 @@ under the License.
 								</filter>
 							</filters>
 							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 									<mainClass>org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter</mainClass>
 								</transformer>
 							</transformers>

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
index 3f2d806..c37ae65 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
@@ -17,19 +17,19 @@
 
 package org.apache.flink.storm.exclamation;
 
-import backtype.storm.Config;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
 
 /**
  * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology} and submitted to
- * Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
+ * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology} and submitted to
+ * Flink for execution in the same way as to a Storm {@link org.apache.storm.LocalCluster}.
  * <p>
  * This example shows how to run program directly within Java, thus it cannot be used to submit a
- * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
+ * {@link org.apache.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
  * <p>
  * The input is a plain text file with lines separated by newline characters.
  * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
index 43f526b..0144acb 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.exclamation;
 
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
 import org.apache.flink.storm.util.BoltFileSink;
@@ -29,7 +29,7 @@ import org.apache.flink.storm.util.SimpleOutputFormatter;
 
 /**
  * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
+ * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
  * <p>
  * The input is a plain text file with lines separated by newline characters.
  * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
index b47c0fa..5a79119 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.storm.exclamation;
 
-import backtype.storm.utils.Utils;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
  * Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
+ * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
  * <p>
  * The input is a plain text file with lines separated by newline characters.
  * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
index 380d9da..237f1d4 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.storm.exclamation;
 
-import backtype.storm.utils.Utils;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
  * Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
+ * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
  * <p>
  * The input is a plain text file with lines separated by newline characters.
  * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
index 9bc00d2..77a91d2 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.storm.exclamation.operators;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
index 3ccd885..41ea4cb 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.storm.join;
 
-import backtype.storm.Config;
-import backtype.storm.testing.FeederSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.Config;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
@@ -29,8 +29,8 @@ import org.apache.flink.storm.util.BoltFileSink;
 import org.apache.flink.storm.util.NullTerminatingSpout;
 import org.apache.flink.storm.util.TupleOutputFormatter;
 
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.bolt.SingleJoinBolt;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.bolt.SingleJoinBolt;
 
 
 public class SingleJoinExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
index de652cf..da2e641 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.storm.print;
 
-import backtype.storm.Config;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.spout.TwitterSampleSpout;
+import org.apache.storm.starter.bolt.PrinterBolt;
+import org.apache.storm.starter.spout.TwitterSampleSpout;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
index d315395..5fbe0a7 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
@@ -20,12 +20,12 @@ package org.apache.flink.storm.split.operators;
 import java.util.Map;
 import java.util.Random;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class RandomSpout extends BaseRichSpout {
 	private static final long serialVersionUID = -3978554318742509334L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
index 434d091..1ad9a6c 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
@@ -19,13 +19,13 @@ package org.apache.flink.storm.split.operators;
 
 import java.util.Map;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 public class VerifyAndEnrichBolt extends BaseRichBolt {
 	private static final long serialVersionUID = -7277395570966328721L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
index a6c61d4..2cb346a 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
index d19ffbf..29df23e 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
index 5cd3f68..cbbe191 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.task.TopologyContext;
 
 import java.io.BufferedWriter;
 import java.io.FileWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
index 044246b..a80417b 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.task.TopologyContext;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
index 1126a2a..0a295e7 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
 
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
index 4b41f8a..48349c2 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
@@ -16,9 +16,9 @@
  */
 package org.apache.flink.storm.util;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
 
 import java.io.IOException;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
index 5e4c7ba..de1ca20 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.tuple.Values;
+import org.apache.storm.tuple.Values;
 
 /**
  * Implements a Spout that reads data stored in memory.

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
index e696f9b..fe28afc 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
index cef0081..323fb53 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
 
 public class SimpleOutputFormatter implements OutputFormatter {
 	private static final long serialVersionUID = 6349573860144270338L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
index 5d7ba53..11d23cd 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.storm.util;
 
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.tuple.Tuple;
 
 public class TupleOutputFormatter implements OutputFormatter {
 	private static final long serialVersionUID = -599665757723851761L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
index cccf4c0..4620d9d 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichBolt;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
index 9b0d4ee..eefbf78 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichBolt;
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.io.PojoCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
index 50d4518..98f7f96 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.io.TupleCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
index 281780e..683a3b5 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
index 2fa79ac..ee880ba 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkLocalCluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
index 046bc04..ab423cf 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkLocalCluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
index 6b718b9..5c99f93 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.NotAliveException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
index eb2713d..08ba52a 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
 
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkClient;
 import org.apache.flink.storm.api.FlinkSubmitter;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
index e4117f4..8f855b5 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.storm.wordcount;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.util.BoltFileSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
index d21a584..4a00869 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.HashMap;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
index d5c05d7..e3e0d58 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.HashMap;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
index 74d6a99..cedd90a 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
index 3c56b36..258d412 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
index 76a198f..1298422 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
@@ -19,8 +19,8 @@ package org.apache.flink.storm.wordcount.operators;
 
 import org.apache.flink.storm.util.FileSpout;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
 
 /**
  * Implements a Spout that reads data from a given local file.

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
index c06c268..7bf40c2 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.util.FiniteInMemorySpout;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
index c7b9c1d..0fc1ba5 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
@@ -19,13 +19,13 @@ package org.apache.flink.storm.split;
 
 import java.util.Map;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 public class SplitBolt extends BaseRichBolt {
 	private static final long serialVersionUID = -6627606934204267173L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
index 52fbc2a..04cfeed 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.storm.split;
 
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.flink.storm.split.operators.RandomSpout;
 import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
 import org.apache.flink.storm.util.BoltFileSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
index 2527616..8671d2e 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.storm.split;
 
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.flink.storm.split.operators.RandomSpout;
 import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
 import org.apache.flink.storm.util.BoltFileSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
index ad334ae..2cde11e 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
@@ -16,8 +16,8 @@
  */
 package org.apache.flink.storm.split;
 
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
index 69b40e8..be880d0 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
@@ -16,8 +16,8 @@
  */
 package org.apache.flink.storm.split;
 
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 5df1337..581f7c1 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -17,9 +17,9 @@
  */
 package org.apache.flink.storm.tests;
 
-import backtype.storm.Config;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.FiniteRandomSpout;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
index ce869df..b19e106 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.storm.tests;
 
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
 
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
index 2518f35..452fef5 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.storm.tests;
 
-import backtype.storm.Config;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
 
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java
index 39072eb..da8d21a 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java
@@ -22,13 +22,13 @@ import java.util.Random;
 
 import org.apache.flink.storm.util.FiniteSpout;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 public class FiniteRandomSpout extends BaseRichSpout implements FiniteSpout {
 	private static final long serialVersionUID = 6592885571932363239L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java
index 2c353bf..7a6ec58 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java
@@ -19,12 +19,12 @@ package org.apache.flink.storm.tests.operators;
 
 import java.util.Map;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
 
 public class MergerBolt extends BaseRichBolt {
 	private static final long serialVersionUID = -7966475984592762720L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java
index 2937909..a6e19b9 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java
@@ -19,12 +19,12 @@ package org.apache.flink.storm.tests.operators;
 
 import java.util.Map;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class MetaDataSpout extends BaseRichSpout {
 	private static final long serialVersionUID = 5305870218033256376L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
index b69dde7..fb5c8d3 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
@@ -19,13 +19,13 @@ package org.apache.flink.storm.tests.operators;
 
 import java.util.Map;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 /**
  * Bolt to prepend all incoming tuple values with the task id.

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java
index a02f1f9..f7b40fc 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java
@@ -19,14 +19,14 @@ package org.apache.flink.storm.tests.operators;
 
 import java.util.Map;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 
 public class VerifyMetaDataBolt extends BaseRichBolt {
 	private static final long serialVersionUID = 1353222852073800478L;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 1169cd4..b6cc61c 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -18,7 +18,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
 	<modelVersion>4.0.0</modelVersion>
 
@@ -40,8 +40,12 @@ under the License.
 		<repository>
 			<id>clojars</id>
 			<url>https://clojars.org/repo/</url>
-			<releases><enabled>true</enabled></releases>
-			<snapshots><enabled>false</enabled></snapshots>
+			<releases>
+				<enabled>true</enabled>
+			</releases>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
 		</repository>
 	</repositories>
 
@@ -71,15 +75,43 @@ under the License.
 		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-core</artifactId>
-			<version>0.9.4</version>
+			<version>1.0.0</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.slf4j</groupId>
 					<artifactId>log4j-over-slf4j</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>ch.qos.logback</groupId>
-					<artifactId>logback-classic</artifactId>
+					<groupId>org.apache.logging.log4j</groupId>
+					<artifactId>log4j-slf4j-impl</artifactId>
+				</exclusion>
+				<exclusion>
+					<artifactId>slf4j-log4j12</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.servlet</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>junit</groupId>
+					<artifactId>junit</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mockito</groupId>
+					<artifactId>mockito-all</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mockito</groupId>
+					<artifactId>mockito-all</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.curator</groupId>
+					<artifactId>curator-test</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.esotericsoftware</groupId>
+					<artifactId>kryo</artifactId>
 				</exclusion>
 				<exclusion>
 					<groupId>ring</groupId>
@@ -102,10 +134,6 @@ under the License.
 					<artifactId>jetty</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
 					<groupId>org.jgrapht</groupId>
 					<artifactId>jgrapht-core</artifactId>
 				</exclusion>
@@ -114,10 +142,6 @@ under the License.
 					<artifactId>compojure</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>com.esotericsoftware.reflectasm</groupId>
-					<artifactId>reflectasm</artifactId>
-				</exclusion>
-				<exclusion>
 					<groupId>com.twitter</groupId>
 					<artifactId>chill-java</artifactId>
 				</exclusion>
@@ -149,7 +173,12 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-		
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1</version>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 6019aa3..c58a8ee 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -22,18 +22,14 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.NotAliveException;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
 import com.esotericsoftware.kryo.Serializer;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.ClusterClient;
@@ -52,11 +48,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index da19a22..367b313 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.storm.api;
 
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInfo;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
index 794beee..b0bebef 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.storm.api;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -29,7 +29,7 @@ import java.util.List;
 
 /**
  * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br />
+ * {@link org.apache.storm.topology.IRichSpout spout} or {@link org.apache.storm.topology.IRichBolt bolt}.<br />
  * <br />
  * <strong>CAUTION: Flink does not support direct emit.</strong>
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index f8932b1..3b191b0 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.utils.Utils;
 
 import java.net.URISyntaxException;
 import java.net.URL;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
index 2546f17..2b36feb 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -18,15 +18,15 @@
  */
 package org.apache.flink.storm.api;
 
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.IRichStateSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.IRichStateSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java
index 72c1569..160a7d9 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java
@@ -16,9 +16,9 @@ import org.apache.flink.storm.wrappers.StormTuple;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.util.Collector;
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
 
 /**
  * Merges a stream of type {@link StormTuple} with a Flink {@link DataStreams} into a stream of type {@link StormTuple}.

http://git-wip-us.apache.org/repos/asf/flink/blob/475c0b1a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
index 1866e32..1e6e2ed 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
@@ -16,9 +16,9 @@ import org.apache.flink.storm.wrappers.StormTuple;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.util.Collector;
 
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
 
 /**
  * Merges two Flink {@link DataStreams} into a stream of type {@link StormTuple}.


[7/9] flink git commit: [FLINK-5485] [webfrontend] Mark compiled web frontend files as binary when processed by git diff

Posted by se...@apache.org.
[FLINK-5485] [webfrontend] Mark compiled web frontend files as binary when processed by git diff

Particularly beneficial now that javascript is minified, we can mark
compiled web frontend files as binary when processed by git diff.
  https://linux.die.net/man/5/gitattributes

This does not affect how files are displayed by github.

This closes #3122


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/faee74ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/faee74ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/faee74ed

Branch: refs/heads/master
Commit: faee74ed8ba2476117b6c9e56932b3541f69ac5c
Parents: c2f28c0
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jan 13 13:33:29 2017 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 20:18:48 2017 +0100

----------------------------------------------------------------------
 .gitattributes | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/faee74ed/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
index b68afa6..ecc9cf2 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1,3 +1,3 @@
 *.bat text eol=crlf
-flink-runtime-web/web-dashboard/web/* linguist-vendored
+flink-runtime-web/web-dashboard/web/* linguist-vendored -diff