You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/28 13:32:18 UTC

[1/4] flink git commit: [FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}

Repository: flink
Updated Branches:
  refs/heads/master 652309c3e -> 354201930


[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}

This closes #2875


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35420193
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35420193
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35420193

Branch: refs/heads/master
Commit: 3542019305cfd5b47204b7858a9b19718b0cb6db
Parents: 3d13a05
Author: shijinkui <sh...@huawei.com>
Authored: Sat Nov 26 16:14:13 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100

----------------------------------------------------------------------
 .../src/main/scala/org/apache/flink/graph/scala/Graph.scala    | 2 +-
 .../flink/graph/scala/test/GellyScalaAPICompletenessTest.scala | 2 +-
 .../runtime/clusterframework/RegisteredMesosWorkerNode.scala   | 2 +-
 .../org/apache/flink/streaming/api/scala/DataStream.scala      | 2 +-
 .../org/apache/flink/streaming/api/scala/SplitStream.scala     | 6 +++---
 .../flink/streaming/api/scala/StreamExecutionEnvironment.scala | 5 ++---
 .../flink/streaming/api/scala/StreamingOperatorsITCase.scala   | 4 ++--
 7 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 4dd9d12..27bc548 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -294,7 +294,7 @@ object Graph {
 }
 
 /**
- * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
+ * Represents a graph consisting of [[Edge]] edges and [[Vertex]] vertices.
  *
  * @param jgraph the underlying java api Graph.
  * @tparam K the key type for vertex and edge identifiers

http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
index d7ab1dd..034bf77 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -26,7 +26,7 @@ import org.junit.Test
 
 /**
  * This checks whether the Gelly Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for Gelly.
+ * Implements the [[ScalaAPICompletenessTestBase]] for Gelly.
  */
 class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
index 59764ef..7ca388f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
@@ -22,7 +22,7 @@ import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
 import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
 
 /**
-  * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}.
+  * A representation of a registered Mesos task managed by the [[MesosFlinkResourceManager]].
   */
 case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 4fe73e9..dbc91bd 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -697,7 +697,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * For the second case and when the watermarks are required to lag behind the maximum
    * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
    * amount is known in advance, use the
-   * {@link org.apache.flink.streaming.api.functions.TimestampExtractorWithFixedAllowedLateness}.
+   * [[org.apache.flink.streaming.api.functions.TimestampExtractorWithFixedAllowedLateness]].
    *
    * For cases where watermarks should be created in an irregular fashion, for example
    * based on certain markers that some element carry, use the

http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
index 0b9ac69..ca4bcc0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
@@ -23,10 +23,10 @@ import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStrea
 
 /**
  * The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply a transformation on the whole output simply call
+ * [[org.apache.flink.streaming.api.collector.selector.OutputSelector]].
+ * Named outputs can be selected using the [[SplitStream#select()]] function.
+ * To apply a transformation on the whole output simply call
  * the appropriate method on this stream.
- *
  */
 @Public
 class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){

http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2e432ba..432e8ac 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -502,9 +502,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     *          The files to be excluded from the processing
     * @return The data stream that represents the data read from the given file
     *
-    * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
-    *         {@link StreamExecutionEnvironment#readFile(FileInputFormat,
-      *              String, FileProcessingMode, long)}
+    * @deprecated Use [[FileInputFormat#setFilesFilter(FilePathFilter)]] to set a filter and
+    * [[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]]
     */
   @PublicEvolving
   @Deprecated

http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index c57c29c..e08e0b5 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -58,8 +58,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
     * The stream is grouped by the first field. For each group, the resulting stream is folded by
     * summing up the second tuple field.
     *
-    * This test relies on the hash function used by the {@link DataStream#keyBy}, which is
-    * assumed to be {@link MathUtils#murmurHash}.
+    * This test relies on the hash function used by the [[DataStream#keyBy]], which is
+    * assumed to be [[MathUtils#murmurHash]].
     */
   @Test
   def testGroupedFoldOperator(): Unit = {


[2/4] flink git commit: [hotfix] [tests] Harden timeout logic for TaskManager registration in AbstractTaskManagerProcessFailureRecoveryTest

Posted by se...@apache.org.
[hotfix] [tests] Harden timeout logic for TaskManager registration in AbstractTaskManagerProcessFailureRecoveryTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e76322c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e76322c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e76322c

Branch: refs/heads/master
Commit: 8e76322cb73a19b4dcb46ba0d1a2ee01d4aeaa9f
Parents: 973ce7d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 23 15:54:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100

----------------------------------------------------------------------
 ...ctTaskManagerProcessFailureRecoveryTest.java | 24 ++++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e76322c/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 0ff2e78..3acf5bb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -273,25 +273,25 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 	public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
 
 
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
 			throws Exception
 	{
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
+		final long interval = maxDelayMillis * 1_000_000;
+		final long deadline = System.nanoTime() + interval;
+		long remaining = interval;
 
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+		while (remaining > 0) {
+			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.NANOSECONDS);
 
 			try {
 				Future<?> result = Patterns.ask(jobManager,
 						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
 						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
+
+				int numTMs = (Integer) Await.result(result, timeout);
+
 				if (numTMs == numExpected) {
-					break;
+					return;
 				}
 			}
 			catch (TimeoutException e) {
@@ -300,7 +300,11 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			catch (ClassCastException e) {
 				fail("Wrong response: " + e.getMessage());
 			}
+
+			remaining = deadline - System.nanoTime();
 		}
+
+		fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
 	}
 
 	protected static void printProcessLog(String processName, String log) {


[3/4] flink git commit: [hotfix] Flush in CsvOutputFormat before closing, to increase CI stability

Posted by se...@apache.org.
[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d13a05d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d13a05d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d13a05d

Branch: refs/heads/master
Commit: 3d13a05d1b2354e027626db280f9bfce9070e570
Parents: 8e76322
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 23 15:37:05 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/CsvOutputFormat.java      |  1 +
 .../flink/api/java/io/CsvOutputFormatTest.java  | 47 +++++++++++++-------
 2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 703128f..c2fe13c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -165,6 +165,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	@Override
 	public void close() throws IOException {
 		if (wrt != null) {
+			this.wrt.flush();
 			this.wrt.close();
 		}
 		super.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a9288c6..a8ce495 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -34,25 +35,30 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
 
+import static org.junit.Assert.fail;
+
 public class CsvOutputFormatTest {
 
 	private String path = null;
-	private CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat;
 
 	@Before
 	public void createFile() throws Exception {
 		path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
-		csvOutputFormat = new CsvOutputFormat<>(new Path(path));
 	}
 
 	@Test
 	public void testNullAllow() throws Exception {
-		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
-		csvOutputFormat.setAllowNullValues(true);
-		csvOutputFormat.open(0, 1);
-		csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
-		csvOutputFormat.close();
+		final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+		try {
+			csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+			csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+			csvOutputFormat.setAllowNullValues(true);
+			csvOutputFormat.open(0, 1);
+			csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+		}
+		finally {
+			csvOutputFormat.close();
+		}
 
 		java.nio.file.Path p = Paths.get(path);
 		Assert.assertTrue(Files.exists(p));
@@ -61,19 +67,28 @@ public class CsvOutputFormatTest {
 		Assert.assertEquals("One,,8", lines.get(0));
 	}
 
-	@Test(expected = RuntimeException.class)
+	@Test
 	public void testNullDisallowOnDefault() throws Exception {
-		csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-		csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
-		csvOutputFormat.open(0, 1);
-		csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
-		csvOutputFormat.close();
+		final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+		try {
+			csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+			csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+			csvOutputFormat.open(0, 1);
+			try {
+				csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+				fail("should fail with an exception");
+			} catch (RuntimeException e) {
+				// expected
+			}
+			
+		}
+		finally {
+			csvOutputFormat.close();
+		}
 	}
 
 	@After
 	public void cleanUp() throws IOException {
-		csvOutputFormat.close();
 		Files.deleteIfExists(Paths.get(path));
 	}
-
 }


[4/4] flink git commit: [FLINK-5050] [build] Remove transitive JSON.org dependency

Posted by se...@apache.org.
[FLINK-5050] [build] Remove transitive JSON.org dependency

This transitive dependency has an incompatible license.

This closes #2824


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/973ce7d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/973ce7d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/973ce7d0

Branch: refs/heads/master
Commit: 973ce7d0eb219ea84ad6b7fd0d063f595485205f
Parents: 652309c
Author: sergey_sokur <so...@gmail.com>
Authored: Thu Nov 17 19:28:20 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100

----------------------------------------------------------------------
 flink-batch-connectors/flink-hcatalog/pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/973ce7d0/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml
index 444bd9a..6889e5a 100644
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ b/flink-batch-connectors/flink-hcatalog/pom.xml
@@ -52,6 +52,12 @@ under the License.
 			<groupId>org.apache.hive.hcatalog</groupId>
 			<artifactId>hcatalog-core</artifactId>
 			<version>0.12.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.json</groupId>
+					<artifactId>json</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>