You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/01 17:53:56 UTC

[1/8] flink git commit: [FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to customize queue config

Repository: flink
Updated Branches:
  refs/heads/master 7ea9c0195 -> 873d6cd18


[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to customize queue config

This closes #2281


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f52d11af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f52d11af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f52d11af

Branch: refs/heads/master
Commit: f52d11af65d962fb79fe365c71938afae3fcbc11
Parents: 923c6a7
Author: philippgrulich <ph...@hotmail.de>
Authored: Thu Jul 21 13:31:24 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f52d11af/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index be7e946..a0795d6 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
 
-	private String queueName;
-	private RMQConnectionConfig rmqConnectionConfig;
-	private transient Connection connection;
-	private transient Channel channel;
-	private SerializationSchema<IN> schema;
+	protected final String queueName;
+	private final RMQConnectionConfig rmqConnectionConfig;
+	protected transient Connection connection;
+	protected transient Channel channel;
+	protected SerializationSchema<IN> schema;
 	private boolean logFailuresOnly = false;
 
 	/**
@@ -58,6 +58,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	/**
+	 * Sets up the queue. The default implementation just declares the queue. The user may override
+	 * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
+	 * defining custom queue parameters)
+	 */
+	protected void setupQueue() throws IOException {
+		channel.queueDeclare(queueName, false, false, false, null);
+	}
+
+	/**
 	 * Defines whether the producer should fail on errors, or only log them.
 	 * If this is set to true, then exceptions will be only logged, if set to false,
 	 * exceptions will be eventually thrown and cause the streaming program to
@@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 			if (channel == null) {
 				throw new RuntimeException("None of RabbitMQ channels are available");
 			}
-			channel.queueDeclare(queueName, false, false, false, null);
+			setupQueue();
 		} catch (IOException e) {
 			throw new RuntimeException("Error while creating the channel", e);
 		}


[2/8] flink git commit: [FLINK-4292] [batch connectors] Fix setup of HCatalog project by adding Scala SDK dependency

Posted by se...@apache.org.
[FLINK-4292] [batch connectors] Fix setup of HCatalog project by adding Scala SDK dependency


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/923c6a7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/923c6a7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/923c6a7e

Branch: refs/heads/master
Commit: 923c6a7ee0f137bb06868f17ccda06fab026ca1b
Parents: ba12a75
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 12:03:33 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200

----------------------------------------------------------------------
 flink-batch-connectors/flink-hcatalog/pom.xml                 | 7 ++++++-
 .../org/apache/flink/hcatalog/scala/HCatInputFormat.scala     | 2 +-
 flink-streaming-scala/pom.xml                                 | 2 +-
 3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/923c6a7e/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml
index f79431d..f23ad12 100644
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ b/flink-batch-connectors/flink-hcatalog/pom.xml
@@ -48,6 +48,12 @@ under the License.
 			<version>0.12.0</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>
@@ -56,7 +62,6 @@ under the License.
 			<plugin>
 				<groupId>net.alchim31.maven</groupId>
 				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
 				<executions>
 					<!-- Run scala compiler in the process-resources phase, so that dependencies on
 						scala classes can be resolved later in the (Java) compile phase -->

http://git-wip-us.apache.org/repos/asf/flink/blob/923c6a7e/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
index d5a3cbf..0299ee1 100644
--- a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -28,7 +28,7 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
  * A InputFormat to read from HCatalog tables.
  * The InputFormat supports projection (selection and order of fields) and partition filters.
  *
- * Data can be returned as {@link HCatRecord} or Scala tuples.
+ * Data can be returned as [[HCatRecord]] or Scala tuples.
  * Scala tuples support only up to 22 fields.
  *
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/923c6a7e/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 0ea80d0..ce56167 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -215,7 +215,7 @@ under the License.
 				</configuration>
 			</plugin>
 
-			<!-- Exclude generated classes from api compatibilty checks -->
+			<!-- Exclude generated classes from api compatibility checks -->
 			<plugin>
 				<groupId>com.github.siom79.japicmp</groupId>
 				<artifactId>japicmp-maven-plugin</artifactId>


[8/8] flink git commit: [FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds

Posted by se...@apache.org.
[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds

Cassandra needs Java 8 to run reliably.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/873d6cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/873d6cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/873d6cd1

Branch: refs/heads/master
Commit: 873d6cd184d7e4f3ad314eefa273b88e08ea6b0a
Parents: 0ea2596
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 16:55:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 .../runtime/testutils/CommonTestUtils.java      |  9 +-
 .../cassandra/CassandraConnectorTest.java       | 92 ++++++++++++++------
 2 files changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 6bd8b34..59c37b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -51,7 +51,7 @@ public class CommonTestUtils {
 			try {
 				Thread.sleep(remaining);
 			}
-			catch (InterruptedException e) {}
+			catch (InterruptedException ignored) {}
 			
 			now = System.currentTimeMillis();
 		}
@@ -137,8 +137,7 @@ public class CommonTestUtils {
 	}
 
 	public static void printLog4jDebugConfig(File file) throws IOException {
-		FileWriter fw = new FileWriter(file);
-		try {
+		try (FileWriter fw = new FileWriter(file)) {
 			PrintWriter writer = new PrintWriter(fw);
 
 			writer.println("log4j.rootLogger=DEBUG, console");
@@ -152,9 +151,6 @@ public class CommonTestUtils {
 			writer.flush();
 			writer.close();
 		}
-		finally {
-			fw.close();
-		}
 	}
 
 	public static File createTempDirectory() throws IOException {
@@ -165,7 +161,6 @@ public class CommonTestUtils {
 			if (!dir.exists() && dir.mkdirs()) {
 				return dir;
 			}
-			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
 		}
 
 		throw new IOException("Could not create temporary file directory");

http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 8d0c02e..2018255 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
@@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+
 import org.apache.cassandra.service.CassandraDaemon;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -42,6 +46,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -49,10 +54,12 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,16 +71,21 @@ import java.util.ArrayList;
 import java.util.Scanner;
 import java.util.UUID;
 
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
 	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
 	private static File tmpDir;
 
 	private static final boolean EMBEDDED = true;
+
 	private static EmbeddedCassandraService cassandra;
-	private transient static ClusterBuilder builder = new ClusterBuilder() {
+
+	private static ClusterBuilder builder = new ClusterBuilder() {
 		@Override
 		protected Cluster buildCluster(Cluster.Builder builder) {
 			return builder
@@ -83,6 +95,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 				.withoutMetrics().build();
 		}
 	};
+
 	private static Cluster cluster;
 	private static Session session;
 
@@ -97,7 +110,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	static {
 		for (int i = 0; i < 20; i++) {
-			collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 0));
+			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
 		}
 	}
 
@@ -115,15 +128,36 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 		}
 	}
 
-	//=====Setup========================================================================================================
+	// ------------------------------------------------------------------------
+	//  Cassandra Cluster Setup
+	// ------------------------------------------------------------------------
+
 	@BeforeClass
 	public static void startCassandra() throws IOException {
-		//generate temporary files
+
+		// check if we should run this test, current Cassandra version requires Java >= 1.8
+		try {
+			String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+			float javaVersion = Float.parseFloat(javaVersionString);
+			Assume.assumeTrue(javaVersion >= 1.8f);
+		}
+		catch (AssumptionViolatedException e) {
+			System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
+			throw e;
+		}
+		catch (Exception e) {
+			LOG.error("Cannot determine Java version", e);
+			e.printStackTrace();
+			fail("Cannot determine Java version");
+		}
+
+		// generate temporary files
 		tmpDir = CommonTestUtils.createTempDirectory();
-		ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
+		ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
 		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
 		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-		tmp.createNewFile();
+		
+		assertTrue(tmp.createNewFile());
 		BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
 
 		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
@@ -139,7 +173,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 		// Tell cassandra where the configuration files are.
 		// Use the test configuration file.
-		System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
+		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
 
 		if (EMBEDDED) {
 			cassandra = new EmbeddedCassandraService();
@@ -160,13 +194,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	@Before
 	public void checkIfIgnore() {
-		String runtime = System.getProperty("java.runtime.name");
-				String version = System.getProperty("java.runtime.version");
-		LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
-		// The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
-		// Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
-		// OpenJDK is "OpenJDK Runtime Environment"
-		Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
+		
 	}
 
 	@After
@@ -176,13 +204,23 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	@AfterClass
 	public static void closeCassandra() {
-		session.executeAsync(DROP_KEYSPACE_QUERY);
-		session.close();
-		cluster.close();
-		if (EMBEDDED) {
+		if (session != null) {
+			session.executeAsync(DROP_KEYSPACE_QUERY);
+			session.close();
+		}
+
+		if (cluster != null) {
+			cluster.close();
+		}
+
+		if (cassandra != null) {
 			cassandra.stop();
 		}
-		tmpDir.delete();
+
+		if (tmpDir != null) {
+			//noinspection ResultOfMethodCallIgnored
+			tmpDir.delete();
+		}
 	}
 
 	//=====Exactly-Once=================================================================================================
@@ -202,7 +240,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	@Override
 	protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
-		return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
+		return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
 	}
 
 	@Override
@@ -379,7 +417,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 		DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
 			new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
-			new TupleTypeInfo(Tuple3.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+			TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
 
 
 		long count = inputDS.count();


[6/8] flink git commit: [hotfix] Remove various '.hidden' files that seem to have been accidentally committed

Posted by se...@apache.org.
[hotfix] Remove various '.hidden' files that seem to have been accidentally committed


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb98256c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb98256c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb98256c

Branch: refs/heads/master
Commit: cb98256cfc8627aa9b26d8614d23a76d44fb010b
Parents: f52d11a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 12:17:35 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/contrib/.hidden                   | 0
 flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden     | 0
 .../src/test/java/org/apache/flink/contrib/.hidden                   | 0
 3 files changed, 0 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb98256c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/flink/blob/cb98256c/flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden b/flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/flink/blob/cb98256c/flink-contrib/flink-tweet-inputformat/src/test/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/test/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-tweet-inputformat/src/test/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000


[4/8] flink git commit: [hotfix] [docs] Remove obsolete file 'plotPoints.py'

Posted by se...@apache.org.
[hotfix] [docs] Remove obsolete file 'plotPoints.py'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8847955f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8847955f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8847955f

Branch: refs/heads/master
Commit: 8847955fac2f98773d1bc86feac247dd93d91e61
Parents: 7ea9c01
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 10:01:35 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200

----------------------------------------------------------------------
 docs/quickstart/plotPoints.py | 100 -------------------------------------
 1 file changed, 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8847955f/docs/quickstart/plotPoints.py
----------------------------------------------------------------------
diff --git a/docs/quickstart/plotPoints.py b/docs/quickstart/plotPoints.py
deleted file mode 100755
index 9ddf5dd..0000000
--- a/docs/quickstart/plotPoints.py
+++ /dev/null
@@ -1,100 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-# http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import matplotlib.pyplot as plt
-import csv
-import os
-
-if len(sys.argv) < 4 or not sys.argv[1] in ['points', 'result']:
-  print "Usage: plot-clusters.py (points|result) <src-file> <pdf-file-prefix>"
-  sys.exit(1)
-
-inFile = sys.argv[1]
-inFile = sys.argv[2]
-outFilePx = sys.argv[3]
-
-inFileName = os.path.splitext(os.path.basename(inFile))[0]
-outFile = os.path.join(".", outFilePx+"-plot.pdf")
-
-########### READ DATA
-
-cs = []
-xs = []
-ys = []
-
-minX = None
-maxX = None
-minY = None
-maxY = None
-
-if sys.argv[1] == 'points':
-
-  with open(inFile, 'rb') as file:
-    for line in file:
-      # parse data
-      csvData = line.strip().split(' ')
-
-      x = float(csvData[0])
-      y = float(csvData[1])
-
-      if not minX or minX > x:
-        minX = x
-      if not maxX or maxX < x:
-        maxX = x
-      if not minY or minY > y:
-        minY = y
-      if not maxY or maxY < y:
-        maxY = y
-
-      xs.append(x)
-      ys.append(y)
-
-    # plot data
-    plt.clf()
-    plt.scatter(xs, ys, s=25, c="#999999", edgecolors='None', alpha=1.0)
-    plt.ylim([minY,maxY])
-    plt.xlim([minX,maxX])
-
-elif sys.argv[1] == 'result':
-
-  with open(inFile, 'rb') as file:
-    for line in file:
-      # parse data
-      csvData = line.strip().split(' ')
-
-      c = int(csvData[0])
-      x = float(csvData[1])
-      y = float(csvData[2])
-
-      cs.append(c)
-      xs.append(x)
-      ys.append(y)
-
-    # plot data
-    plt.clf()
-    plt.scatter(xs, ys, s=25, c=cs, edgecolors='None', alpha=1.0)
-    plt.ylim([minY,maxY])
-    plt.xlim([minX,maxX])
-
-
-plt.savefig(outFile, dpi=600)
-print "\nPlotted file: %s" % outFile
-
-sys.exit(0)
\ No newline at end of file


[3/8] flink git commit: [FLINK-4285] [docs] Update the setup quickstart guide with the new SocketWindowWordCount example

Posted by se...@apache.org.
[FLINK-4285] [docs] Update the setup quickstart guide with the new SocketWindowWordCount example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba12a751
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba12a751
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba12a751

Branch: refs/heads/master
Commit: ba12a7514ca7f4548cc9cb166844dd6308138672
Parents: 8847955
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 11:22:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200

----------------------------------------------------------------------
 docs/quickstart/setup_quickstart.md                          | 8 +++-----
 .../streaming/examples/socket/SocketWindowWordCount.java     | 2 +-
 .../scala/examples/socket/SocketWindowWordCount.scala        | 2 +-
 3 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba12a751/docs/quickstart/setup_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/setup_quickstart.md b/docs/quickstart/setup_quickstart.md
index 261f14f..2c3e6fd 100644
--- a/docs/quickstart/setup_quickstart.md
+++ b/docs/quickstart/setup_quickstart.md
@@ -56,7 +56,7 @@ Check the __JobManager's web frontend__ at [http://localhost:8081](http://localh
 
 ## Run Example
 
-Now, we are going to run the [SocketTextStreamWordCount example](https://github.com/apache/flink/blob/release-1.0.0/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java) and read text from a socket and count the number of distinct words.
+Now, we are going to run the [SocketWindowWordCount example](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java) and read text from a socket and count the number of distinct words.
 
 * First of all, we use **netcat** to start local server via
 
@@ -67,10 +67,8 @@ Now, we are going to run the [SocketTextStreamWordCount example](https://github.
 * Submit the Flink program:
 
   ~~~bash
-  $ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \
-    --hostname localhost \
-    --port 9000
-  Printing result to stdout. Use --output to specify output path.
+  $ bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
+
   03/08/2016 17:21:56 Job execution switched to status RUNNING.
   03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
   03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING

http://git-wip-us.apache.org/repos/asf/flink/blob/ba12a751/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index 10e8ca0..d6cbe87 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -48,7 +48,7 @@ public class SocketWindowWordCount {
 			final ParameterTool params = ParameterTool.fromArgs(args);
 			port = params.getInt("port");
 		} catch (Exception e) {
-			System.err.println("No port specified. Please run 'WindowWordCount --port <port>', " +
+			System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
 					"where port is the address of the text server");
 			System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text " +
 					"into the command line");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba12a751/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index e942bb5..3b432ec 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -43,7 +43,7 @@ object SocketWindowWordCount {
       ParameterTool.fromArgs(args).getInt("port")
     } catch {
       case e: Exception => {
-        System.err.println("No port specified. Please run 'WindowWordCount --port <port>', " +
+        System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
           "where port is the address of the text server")
         System.err.println("To start a simple text server, run 'netcat -l <port>' " +
           "and type the input text into the command line")


[7/8] flink git commit: [FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 'flink-storm'

Posted by se...@apache.org.
[FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 'flink-storm'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea2596e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea2596e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea2596e

Branch: refs/heads/master
Commit: 0ea2596e1b605c1eedb843273660ef1366463313
Parents: 4456453
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 15:58:12 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm/pom.xml               | 83 +++++++-------------
 .../org/apache/flink/storm/api/FlinkClient.java |  5 +-
 .../flink/storm/wrappers/BoltWrapper.java       | 19 +++--
 .../storm/wrappers/FlinkTopologyContext.java    |  3 +-
 .../storm/wrappers/MergedInputsBoltWrapper.java |  6 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |  8 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  | 12 ++-
 7 files changed, 60 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 590f33d..0ac49db 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -61,18 +61,40 @@ under the License.
 					<artifactId>log4j-over-slf4j</artifactId>
 				</exclusion>
 				<exclusion>
-					<artifactId>logback-classic</artifactId>
 					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-classic</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-devel</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-servlet</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ring</groupId>
+					<artifactId>ring-jetty-adapter</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.jgrapht</groupId>
+					<artifactId>jgrapht-core</artifactId>
 				</exclusion>
 			</exclusions>
 		</dependency>
 
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
 		<!-- test dependencies -->
 
 		<dependency>
@@ -85,51 +107,4 @@ under the License.
 		
 	</dependencies>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-
-	</build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 9628bb7..f4bcfb7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import backtype.storm.Config;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
@@ -32,7 +33,6 @@ import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
 import com.esotericsoftware.kryo.Serializer;
-import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,7 +216,7 @@ public class FlinkClient {
 
 		try {
 			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
-					Lists.newArrayList(uploadedJarUrl),
+					Collections.<URL>singletonList(uploadedJarUrl),
 					Collections.<URL>emptyList(),
 					this.getClass().getClassLoader());
 			client.runDetached(jobGraph, classLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 5311cb3..6e316e7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.generated.GlobalStreamId;
@@ -26,8 +27,6 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.utils.Utils;
 
-import com.google.common.collect.Sets;
-
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -44,6 +43,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
  * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
@@ -135,9 +136,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not within range
 	 *             [1;25].
 	 */
-	public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
+	public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) 
 			throws IllegalArgumentException {
-		this(bolt, null, Sets.newHashSet(rawOutputs));
+		this(bolt, null, asList(rawOutputs));
 	}
 
 	/**
@@ -157,8 +158,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
 	 *             [1;25].
 	 */
-	public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
-			throws IllegalArgumentException {
+	public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) throws IllegalArgumentException {
 		this(bolt, null, rawOutputs);
 	}
 
@@ -181,9 +181,12 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
 	 *             [0;25].
 	 */
-	public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs)
+	public BoltWrapper(
+			final IRichBolt bolt,
+			final Fields inputSchema,
+			final String[] rawOutputs) 
 			throws IllegalArgumentException {
-		this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+		this(bolt, inputSchema, asList(rawOutputs));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index db1d147..52d39a7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -120,9 +120,8 @@ final class FlinkTopologyContext extends TopologyContext {
 	 * @throws UnsupportedOperationException
 	 * 		at every invocation
 	 */
-	@SuppressWarnings("unchecked")
 	@Override
-	public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
+	public <T extends IMetric> T registerMetric(final String name, final T metric, final int timeBucketSizeInSecs) {
 		throw new UnsupportedOperationException("Metrics are not supported by Flink");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
index 89defde..7a3b6d5 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.topology.IRichBolt;
-import com.google.common.collect.Sets;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of type {@link StormTuple}. It
  * can be used to wrap a multi-input bolt and assumes that all input stream got merged into a {@link StormTuple} stream
@@ -67,7 +69,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup
 	 */
 	public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
 			throws IllegalArgumentException {
-		super(bolt, Sets.newHashSet(rawOutputs));
+		super(bolt, asList(rawOutputs));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 66b05c6..c171ccc 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -22,8 +22,6 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 
-import com.google.common.collect.Sets;
-
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -37,6 +35,8 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import java.util.Collection;
 import java.util.HashMap;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
  * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
@@ -121,7 +121,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp
 	 */
 	public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
 			throws IllegalArgumentException {
-		this(spout, Sets.newHashSet(rawOutputs), null);
+		this(spout, asList(rawOutputs), null);
 	}
 
 	/**
@@ -147,7 +147,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp
 	 */
 	public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
 			final Integer numberOfInvocations) throws IllegalArgumentException {
-		this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations);
+		this(spout, asList(rawOutputs), numberOfInvocations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 82b12d6..000fe84 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.Config;
@@ -27,16 +28,18 @@ import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-import com.google.common.collect.Sets;
+
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -48,6 +51,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -85,7 +90,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		WrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
-				Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
+				new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -143,8 +148,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
 
 		Assert.assertEquals(attributes, WrapperSetupHelper.getNumberOfAttributes(
 				boltOrSpout,
-				numberOfAttributes == -1 ? Sets
-						.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
+				numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null));
 	}
 
 	@Test


[5/8] flink git commit: [hotfix] [build] Update LICENSE file with new URL of simplejmx dependency

Posted by se...@apache.org.
[hotfix] [build] Update LICENSE file with new URL of simplejmx dependency


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4456453b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4456453b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4456453b

Branch: refs/heads/master
Commit: 4456453b5ce7c706529ad06a18dc83bb1238ceef
Parents: cb98256
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 14:15:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 LICENSE | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4456453b/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index dcfc16c..07df05f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -305,9 +305,11 @@ Open Font License (OFT) - http://scripts.sil.org/OFL
 -----------------------------------------------------------------------
  The ISC License
 -----------------------------------------------------------------------
- 
- The Apache Flink project contains code under the ISC license from the following files:
-  - simplejmx (http://256.com/sources/simplejmx/) Copyright (c) - Gray Watson
+
+The Apache Flink project contains or reuses code that is licensed under the ISC license
+from the following projects:
+
+  - simplejmx (http://256stuff.com/sources/simplejmx/) Copyright (c) - Gray Watson
 
 Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
 granted, provided that this permission notice appear in all copies.