You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/01 17:53:56 UTC
[1/8] flink git commit: [FLINK-4251] [Rabbit MQ] Allow users to
override queue setup in order to customize queue config
Repository: flink
Updated Branches:
refs/heads/master 7ea9c0195 -> 873d6cd18
[FLINK-4251] [Rabbit MQ] Allow users to override queue setup in order to customize queue config
This closes #2281
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f52d11af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f52d11af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f52d11af
Branch: refs/heads/master
Commit: f52d11af65d962fb79fe365c71938afae3fcbc11
Parents: 923c6a7
Author: philippgrulich <ph...@hotmail.de>
Authored: Thu Jul 21 13:31:24 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200
----------------------------------------------------------------------
.../streaming/connectors/rabbitmq/RMQSink.java | 21 ++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f52d11af/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index be7e946..a0795d6 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -39,11 +39,11 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
- private String queueName;
- private RMQConnectionConfig rmqConnectionConfig;
- private transient Connection connection;
- private transient Channel channel;
- private SerializationSchema<IN> schema;
+ protected final String queueName;
+ private final RMQConnectionConfig rmqConnectionConfig;
+ protected transient Connection connection;
+ protected transient Channel channel;
+ protected SerializationSchema<IN> schema;
private boolean logFailuresOnly = false;
/**
@@ -58,6 +58,15 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
}
/**
+ * Sets up the queue. The default implementation just declares the queue. The user may override
+ * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
+ * defining custom queue parameters)
+ */
+ protected void setupQueue() throws IOException {
+ channel.queueDeclare(queueName, false, false, false, null);
+ }
+
+ /**
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
* exceptions will be eventually thrown and cause the streaming program to
@@ -79,7 +88,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
- channel.queueDeclare(queueName, false, false, false, null);
+ setupQueue();
} catch (IOException e) {
throw new RuntimeException("Error while creating the channel", e);
}
[2/8] flink git commit: [FLINK-4292] [batch connectors] Fix setup of
HCatalog project by adding Scala SDK dependency
Posted by se...@apache.org.
[FLINK-4292] [batch connectors] Fix setup of HCatalog project by adding Scala SDK dependency
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/923c6a7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/923c6a7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/923c6a7e
Branch: refs/heads/master
Commit: 923c6a7ee0f137bb06868f17ccda06fab026ca1b
Parents: ba12a75
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 12:03:33 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200
----------------------------------------------------------------------
flink-batch-connectors/flink-hcatalog/pom.xml | 7 ++++++-
.../org/apache/flink/hcatalog/scala/HCatInputFormat.scala | 2 +-
flink-streaming-scala/pom.xml | 2 +-
3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/923c6a7e/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml
index f79431d..f23ad12 100644
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ b/flink-batch-connectors/flink-hcatalog/pom.xml
@@ -48,6 +48,12 @@ under the License.
<version>0.12.0</version>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
<build>
@@ -56,7 +62,6 @@ under the License.
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
http://git-wip-us.apache.org/repos/asf/flink/blob/923c6a7e/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
index d5a3cbf..0299ee1 100644
--- a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -28,7 +28,7 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
* A InputFormat to read from HCatalog tables.
* The InputFormat supports projection (selection and order of fields) and partition filters.
*
- * Data can be returned as {@link HCatRecord} or Scala tuples.
+ * Data can be returned as [[HCatRecord]] or Scala tuples.
* Scala tuples support only up to 22 fields.
*
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/923c6a7e/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 0ea80d0..ce56167 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -215,7 +215,7 @@ under the License.
</configuration>
</plugin>
- <!-- Exclude generated classes from api compatibilty checks -->
+ <!-- Exclude generated classes from api compatibility checks -->
<plugin>
<groupId>com.github.siom79.japicmp</groupId>
<artifactId>japicmp-maven-plugin</artifactId>
[8/8] flink git commit: [FLINK-4290] [Cassandra Connector] Skip
CassandraConnectorTest on Java 7 builds
Posted by se...@apache.org.
[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds
Cassandra needs Java 8 to run reliably.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/873d6cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/873d6cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/873d6cd1
Branch: refs/heads/master
Commit: 873d6cd184d7e4f3ad314eefa273b88e08ea6b0a
Parents: 0ea2596
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 16:55:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200
----------------------------------------------------------------------
.../runtime/testutils/CommonTestUtils.java | 9 +-
.../cassandra/CassandraConnectorTest.java | 92 ++++++++++++++------
2 files changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 6bd8b34..59c37b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -51,7 +51,7 @@ public class CommonTestUtils {
try {
Thread.sleep(remaining);
}
- catch (InterruptedException e) {}
+ catch (InterruptedException ignored) {}
now = System.currentTimeMillis();
}
@@ -137,8 +137,7 @@ public class CommonTestUtils {
}
public static void printLog4jDebugConfig(File file) throws IOException {
- FileWriter fw = new FileWriter(file);
- try {
+ try (FileWriter fw = new FileWriter(file)) {
PrintWriter writer = new PrintWriter(fw);
writer.println("log4j.rootLogger=DEBUG, console");
@@ -152,9 +151,6 @@ public class CommonTestUtils {
writer.flush();
writer.close();
}
- finally {
- fw.close();
- }
}
public static File createTempDirectory() throws IOException {
@@ -165,7 +161,6 @@ public class CommonTestUtils {
if (!dir.exists() && dir.mkdirs()) {
return dir;
}
- System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
}
throw new IOException("Could not create temporary file directory");
http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 8d0c02e..2018255 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.core.Cluster;
@@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+
import org.apache.cassandra.service.CassandraDaemon;
+
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -42,6 +46,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -49,10 +54,12 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,16 +71,21 @@ import java.util.ArrayList;
import java.util.Scanner;
import java.util.UUID;
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static File tmpDir;
private static final boolean EMBEDDED = true;
+
private static EmbeddedCassandraService cassandra;
- private transient static ClusterBuilder builder = new ClusterBuilder() {
+
+ private static ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
@@ -83,6 +95,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
.withoutMetrics().build();
}
};
+
private static Cluster cluster;
private static Session session;
@@ -97,7 +110,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
static {
for (int i = 0; i < 20; i++) {
- collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 0));
+ collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
}
}
@@ -115,15 +128,36 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
}
}
- //=====Setup========================================================================================================
+ // ------------------------------------------------------------------------
+ // Cassandra Cluster Setup
+ // ------------------------------------------------------------------------
+
@BeforeClass
public static void startCassandra() throws IOException {
- //generate temporary files
+
+ // check if we should run this test, current Cassandra version requires Java >= 1.8
+ try {
+ String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+ float javaVersion = Float.parseFloat(javaVersionString);
+ Assume.assumeTrue(javaVersion >= 1.8f);
+ }
+ catch (AssumptionViolatedException e) {
+ System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
+ throw e;
+ }
+ catch (Exception e) {
+ LOG.error("Cannot determine Java version", e);
+ e.printStackTrace();
+ fail("Cannot determine Java version");
+ }
+
+ // generate temporary files
tmpDir = CommonTestUtils.createTempDirectory();
- ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
+ ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
File file = new File(classLoader.getResource("cassandra.yaml").getFile());
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
- tmp.createNewFile();
+
+ assertTrue(tmp.createNewFile());
BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
//copy cassandra.yaml; inject absolute paths into cassandra.yaml
@@ -139,7 +173,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
// Tell cassandra where the configuration files are.
// Use the test configuration file.
- System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
+ System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
if (EMBEDDED) {
cassandra = new EmbeddedCassandraService();
@@ -160,13 +194,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@Before
public void checkIfIgnore() {
- String runtime = System.getProperty("java.runtime.name");
- String version = System.getProperty("java.runtime.version");
- LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
- // The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
- // Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
- // OpenJDK is "OpenJDK Runtime Environment"
- Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
+
}
@After
@@ -176,13 +204,23 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@AfterClass
public static void closeCassandra() {
- session.executeAsync(DROP_KEYSPACE_QUERY);
- session.close();
- cluster.close();
- if (EMBEDDED) {
+ if (session != null) {
+ session.executeAsync(DROP_KEYSPACE_QUERY);
+ session.close();
+ }
+
+ if (cluster != null) {
+ cluster.close();
+ }
+
+ if (cassandra != null) {
cassandra.stop();
}
- tmpDir.delete();
+
+ if (tmpDir != null) {
+ //noinspection ResultOfMethodCallIgnored
+ tmpDir.delete();
+ }
}
//=====Exactly-Once=================================================================================================
@@ -202,7 +240,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@Override
protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
- return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
+ return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
}
@Override
@@ -379,7 +417,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
- new TupleTypeInfo(Tuple3.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+ TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
long count = inputDS.count();
[6/8] flink git commit: [hotfix] Remove various '.hidden' files that
seem to have been accidentally committed
Posted by se...@apache.org.
[hotfix] Remove various '.hidden' files that seem to have been accidentally committed
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb98256c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb98256c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb98256c
Branch: refs/heads/master
Commit: cb98256cfc8627aa9b26d8614d23a76d44fb010b
Parents: f52d11a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 12:17:35 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/contrib/.hidden | 0
flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden | 0
.../src/test/java/org/apache/flink/contrib/.hidden | 0
3 files changed, 0 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cb98256c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/flink/blob/cb98256c/flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden b/flink-contrib/flink-tweet-inputformat/src/main/resources/.hidden
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/flink/blob/cb98256c/flink-contrib/flink-tweet-inputformat/src/test/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/test/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-tweet-inputformat/src/test/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000
[4/8] flink git commit: [hotfix] [docs] Remove obsolete file
'plotPoints.py'
Posted by se...@apache.org.
[hotfix] [docs] Remove obsolete file 'plotPoints.py'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8847955f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8847955f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8847955f
Branch: refs/heads/master
Commit: 8847955fac2f98773d1bc86feac247dd93d91e61
Parents: 7ea9c01
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 10:01:35 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200
----------------------------------------------------------------------
docs/quickstart/plotPoints.py | 100 -------------------------------------
1 file changed, 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8847955f/docs/quickstart/plotPoints.py
----------------------------------------------------------------------
diff --git a/docs/quickstart/plotPoints.py b/docs/quickstart/plotPoints.py
deleted file mode 100755
index 9ddf5dd..0000000
--- a/docs/quickstart/plotPoints.py
+++ /dev/null
@@ -1,100 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import matplotlib.pyplot as plt
-import csv
-import os
-
-if len(sys.argv) < 4 or not sys.argv[1] in ['points', 'result']:
- print "Usage: plot-clusters.py (points|result) <src-file> <pdf-file-prefix>"
- sys.exit(1)
-
-inFile = sys.argv[1]
-inFile = sys.argv[2]
-outFilePx = sys.argv[3]
-
-inFileName = os.path.splitext(os.path.basename(inFile))[0]
-outFile = os.path.join(".", outFilePx+"-plot.pdf")
-
-########### READ DATA
-
-cs = []
-xs = []
-ys = []
-
-minX = None
-maxX = None
-minY = None
-maxY = None
-
-if sys.argv[1] == 'points':
-
- with open(inFile, 'rb') as file:
- for line in file:
- # parse data
- csvData = line.strip().split(' ')
-
- x = float(csvData[0])
- y = float(csvData[1])
-
- if not minX or minX > x:
- minX = x
- if not maxX or maxX < x:
- maxX = x
- if not minY or minY > y:
- minY = y
- if not maxY or maxY < y:
- maxY = y
-
- xs.append(x)
- ys.append(y)
-
- # plot data
- plt.clf()
- plt.scatter(xs, ys, s=25, c="#999999", edgecolors='None', alpha=1.0)
- plt.ylim([minY,maxY])
- plt.xlim([minX,maxX])
-
-elif sys.argv[1] == 'result':
-
- with open(inFile, 'rb') as file:
- for line in file:
- # parse data
- csvData = line.strip().split(' ')
-
- c = int(csvData[0])
- x = float(csvData[1])
- y = float(csvData[2])
-
- cs.append(c)
- xs.append(x)
- ys.append(y)
-
- # plot data
- plt.clf()
- plt.scatter(xs, ys, s=25, c=cs, edgecolors='None', alpha=1.0)
- plt.ylim([minY,maxY])
- plt.xlim([minX,maxX])
-
-
-plt.savefig(outFile, dpi=600)
-print "\nPlotted file: %s" % outFile
-
-sys.exit(0)
\ No newline at end of file
[3/8] flink git commit: [FLINK-4285] [docs] Update the setup
quickstart guide with the new SocketWindowWordCount example
Posted by se...@apache.org.
[FLINK-4285] [docs] Update the setup quickstart guide with the new SocketWindowWordCount example
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba12a751
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba12a751
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba12a751
Branch: refs/heads/master
Commit: ba12a7514ca7f4548cc9cb166844dd6308138672
Parents: 8847955
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 11:22:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:12 2016 +0200
----------------------------------------------------------------------
docs/quickstart/setup_quickstart.md | 8 +++-----
.../streaming/examples/socket/SocketWindowWordCount.java | 2 +-
.../scala/examples/socket/SocketWindowWordCount.scala | 2 +-
3 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ba12a751/docs/quickstart/setup_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/setup_quickstart.md b/docs/quickstart/setup_quickstart.md
index 261f14f..2c3e6fd 100644
--- a/docs/quickstart/setup_quickstart.md
+++ b/docs/quickstart/setup_quickstart.md
@@ -56,7 +56,7 @@ Check the __JobManager's web frontend__ at [http://localhost:8081](http://localh
## Run Example
-Now, we are going to run the [SocketTextStreamWordCount example](https://github.com/apache/flink/blob/release-1.0.0/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java) and read text from a socket and count the number of distinct words.
+Now, we are going to run the [SocketWindowWordCount example](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java) and read text from a socket and count the number of distinct words.
* First of all, we use **netcat** to start local server via
@@ -67,10 +67,8 @@ Now, we are going to run the [SocketTextStreamWordCount example](https://github.
* Submit the Flink program:
~~~bash
- $ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \
- --hostname localhost \
- --port 9000
- Printing result to stdout. Use --output to specify output path.
+ $ bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
+
03/08/2016 17:21:56 Job execution switched to status RUNNING.
03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
03/08/2016 17:21:56 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
http://git-wip-us.apache.org/repos/asf/flink/blob/ba12a751/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index 10e8ca0..d6cbe87 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -48,7 +48,7 @@ public class SocketWindowWordCount {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
- System.err.println("No port specified. Please run 'WindowWordCount --port <port>', " +
+ System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
"where port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text " +
"into the command line");
http://git-wip-us.apache.org/repos/asf/flink/blob/ba12a751/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index e942bb5..3b432ec 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -43,7 +43,7 @@ object SocketWindowWordCount {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
- System.err.println("No port specified. Please run 'WindowWordCount --port <port>', " +
+ System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
"where port is the address of the text server")
System.err.println("To start a simple text server, run 'netcat -l <port>' " +
"and type the input text into the command line")
[7/8] flink git commit: [FLINK-4298] [storm compatibility] Clean up
unnecessary dependencies in 'flink-storm'
Posted by se...@apache.org.
[FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 'flink-storm'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea2596e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea2596e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea2596e
Branch: refs/heads/master
Commit: 0ea2596e1b605c1eedb843273660ef1366463313
Parents: 4456453
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 15:58:12 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200
----------------------------------------------------------------------
flink-contrib/flink-storm/pom.xml | 83 +++++++-------------
.../org/apache/flink/storm/api/FlinkClient.java | 5 +-
.../flink/storm/wrappers/BoltWrapper.java | 19 +++--
.../storm/wrappers/FlinkTopologyContext.java | 3 +-
.../storm/wrappers/MergedInputsBoltWrapper.java | 6 +-
.../flink/storm/wrappers/SpoutWrapper.java | 8 +-
.../storm/wrappers/WrapperSetupHelperTest.java | 12 ++-
7 files changed, 60 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 590f33d..0ac49db 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -61,18 +61,40 @@ under the License.
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
- <artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ring</groupId>
+ <artifactId>ring-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ring</groupId>
+ <artifactId>ring-devel</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ring</groupId>
+ <artifactId>ring-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ring</groupId>
+ <artifactId>ring-jetty-adapter</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jgrapht</groupId>
+ <artifactId>jgrapht-core</artifactId>
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
<!-- test dependencies -->
<dependency>
@@ -85,51 +107,4 @@ under the License.
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[2.9,)</versionRange>
- <goals>
- <goal>unpack</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- </build>
-
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 9628bb7..f4bcfb7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
+
import backtype.storm.Config;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
@@ -32,7 +33,6 @@ import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import com.esotericsoftware.kryo.Serializer;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -215,7 +216,7 @@ public class FlinkClient {
try {
ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
- Lists.newArrayList(uploadedJarUrl),
+ Collections.<URL>singletonList(uploadedJarUrl),
Collections.<URL>emptyList(),
this.getClass().getClassLoader());
client.runDetached(jobGraph, classLoader);
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 5311cb3..6e316e7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.wrappers;
import backtype.storm.generated.GlobalStreamId;
@@ -26,8 +27,6 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.MessageId;
import backtype.storm.utils.Utils;
-import com.google.common.collect.Sets;
-
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
@@ -44,6 +43,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import static java.util.Arrays.asList;
+
/**
* A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
* It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
@@ -135,9 +136,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@code rawOuput} is {@code false} and the number of declared output attributes is not within range
* [1;25].
*/
- public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
+ public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
- this(bolt, null, Sets.newHashSet(rawOutputs));
+ this(bolt, null, asList(rawOutputs));
}
/**
@@ -157,8 +158,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [1;25].
*/
- public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
- throws IllegalArgumentException {
+ public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, null, rawOutputs);
}
@@ -181,9 +181,12 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
*/
- public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs)
+ public BoltWrapper(
+ final IRichBolt bolt,
+ final Fields inputSchema,
+ final String[] rawOutputs)
throws IllegalArgumentException {
- this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+ this(bolt, inputSchema, asList(rawOutputs));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index db1d147..52d39a7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -120,9 +120,8 @@ final class FlinkTopologyContext extends TopologyContext {
* @throws UnsupportedOperationException
* at every invocation
*/
- @SuppressWarnings("unchecked")
@Override
- public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
+ public <T extends IMetric> T registerMetric(final String name, final T metric, final int timeBucketSizeInSecs) {
throw new UnsupportedOperationException("Metrics are not supported by Flink");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
index 89defde..7a3b6d5 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.wrappers;
import backtype.storm.topology.IRichBolt;
-import com.google.common.collect.Sets;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
@@ -26,6 +26,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
+import static java.util.Arrays.asList;
+
/**
* A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of type {@link StormTuple}. It
* can be used to wrap a multi-input bolt and assumes that all input stream got merged into a {@link StormTuple} stream
@@ -67,7 +69,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup
*/
public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
- super(bolt, Sets.newHashSet(rawOutputs));
+ super(bolt, asList(rawOutputs));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 66b05c6..c171ccc 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -22,8 +22,6 @@ import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
-import com.google.common.collect.Sets;
-
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.java.tuple.Tuple0;
@@ -37,6 +35,8 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import java.util.Collection;
import java.util.HashMap;
+import static java.util.Arrays.asList;
+
/**
* A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
* takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
@@ -121,7 +121,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp
*/
public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
throws IllegalArgumentException {
- this(spout, Sets.newHashSet(rawOutputs), null);
+ this(spout, asList(rawOutputs), null);
}
/**
@@ -147,7 +147,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp
*/
public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
final Integer numberOfInvocations) throws IllegalArgumentException {
- this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations);
+ this(spout, asList(rawOutputs), numberOfInvocations);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 82b12d6..000fe84 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.wrappers;
import backtype.storm.Config;
@@ -27,16 +28,18 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
-import com.google.common.collect.Sets;
+
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -48,6 +51,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import static java.util.Collections.singleton;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -85,7 +90,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
WrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
- Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
+ new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)));
}
@Test(expected = IllegalArgumentException.class)
@@ -143,8 +148,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
Assert.assertEquals(attributes, WrapperSetupHelper.getNumberOfAttributes(
boltOrSpout,
- numberOfAttributes == -1 ? Sets
- .newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
+ numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null));
}
@Test
[5/8] flink git commit: [hotfix] [build] Update LICENSE file with new
URL of simplejmx dependency
Posted by se...@apache.org.
[hotfix] [build] Update LICENSE file with new URL of simplejmx dependency
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4456453b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4456453b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4456453b
Branch: refs/heads/master
Commit: 4456453b5ce7c706529ad06a18dc83bb1238ceef
Parents: cb98256
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 14:15:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200
----------------------------------------------------------------------
LICENSE | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4456453b/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index dcfc16c..07df05f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -305,9 +305,11 @@ Open Font License (OFT) - http://scripts.sil.org/OFL
-----------------------------------------------------------------------
The ISC License
-----------------------------------------------------------------------
-
- The Apache Flink project contains code under the ISC license from the following files:
- - simplejmx (http://256.com/sources/simplejmx/) Copyright (c) - Gray Watson
+
+The Apache Flink project contains or reuses code that is licensed under the ISC license
+from the following projects:
+
+ - simplejmx (http://256stuff.com/sources/simplejmx/) Copyright (c) - Gray Watson
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby
granted, provided that this permission notice appear in all copies.