You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/28 13:32:18 UTC
[1/4] flink git commit: [FLINK-5168] Scaladoc annotation link use
[[]] instead of {@link}
Repository: flink
Updated Branches:
refs/heads/master 652309c3e -> 354201930
[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}
This closes #2875
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35420193
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35420193
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35420193
Branch: refs/heads/master
Commit: 3542019305cfd5b47204b7858a9b19718b0cb6db
Parents: 3d13a05
Author: shijinkui <sh...@huawei.com>
Authored: Sat Nov 26 16:14:13 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100
----------------------------------------------------------------------
.../src/main/scala/org/apache/flink/graph/scala/Graph.scala | 2 +-
.../flink/graph/scala/test/GellyScalaAPICompletenessTest.scala | 2 +-
.../runtime/clusterframework/RegisteredMesosWorkerNode.scala | 2 +-
.../org/apache/flink/streaming/api/scala/DataStream.scala | 2 +-
.../org/apache/flink/streaming/api/scala/SplitStream.scala | 6 +++---
.../flink/streaming/api/scala/StreamExecutionEnvironment.scala | 5 ++---
.../flink/streaming/api/scala/StreamingOperatorsITCase.scala | 4 ++--
7 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 4dd9d12..27bc548 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -294,7 +294,7 @@ object Graph {
}
/**
- * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
+ * Represents a graph consisting of [[Edge]] edges and [[Vertex]] vertices.
*
* @param jgraph the underlying java api Graph.
* @tparam K the key type for vertex and edge identifiers
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
index d7ab1dd..034bf77 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -26,7 +26,7 @@ import org.junit.Test
/**
* This checks whether the Gelly Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for Gelly.
+ * Implements the [[ScalaAPICompletenessTestBase]] for Gelly.
*/
class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
index 59764ef..7ca388f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
@@ -22,7 +22,7 @@ import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable}
/**
- * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}.
+ * A representation of a registered Mesos task managed by the [[MesosFlinkResourceManager]].
*/
case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable {
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 4fe73e9..dbc91bd 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -697,7 +697,7 @@ class DataStream[T](stream: JavaStream[T]) {
* For the second case and when the watermarks are required to lag behind the maximum
* timestamp seen so far in the elements of the stream by a fixed amount of time, and this
* amount is known in advance, use the
- * {@link org.apache.flink.streaming.api.functions.TimestampExtractorWithFixedAllowedLateness}.
+ * [[org.apache.flink.streaming.api.functions.TimestampExtractorWithFixedAllowedLateness]].
*
* For cases where watermarks should be created in an irregular fashion, for example
* based on certain markers that some element carry, use the
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
index 0b9ac69..ca4bcc0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
@@ -23,10 +23,10 @@ import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStrea
/**
* The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply a transformation on the whole output simply call
+ * [[org.apache.flink.streaming.api.collector.selector.OutputSelector]].
+ * Named outputs can be selected using the [[SplitStream#select()]] function.
+ * To apply a transformation on the whole output simply call
* the appropriate method on this stream.
- *
*/
@Public
class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2e432ba..432e8ac 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -502,9 +502,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* The files to be excluded from the processing
* @return The data stream that represents the data read from the given file
*
- * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
- * {@link StreamExecutionEnvironment#readFile(FileInputFormat,
- * String, FileProcessingMode, long)}
+ * @deprecated Use [[FileInputFormat#setFilesFilter(FilePathFilter)]] to set a filter and
+ * [[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]]
*/
@PublicEvolving
@Deprecated
http://git-wip-us.apache.org/repos/asf/flink/blob/35420193/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index c57c29c..e08e0b5 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -58,8 +58,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
* The stream is grouped by the first field. For each group, the resulting stream is folded by
* summing up the second tuple field.
*
- * This test relies on the hash function used by the {@link DataStream#keyBy}, which is
- * assumed to be {@link MathUtils#murmurHash}.
+ * This test relies on the hash function used by the [[DataStream#keyBy]], which is
+ * assumed to be [[MathUtils#murmurHash]].
*/
@Test
def testGroupedFoldOperator(): Unit = {
[2/4] flink git commit: [hotfix] [tests] Harden timeout logic for
TaskManager registration in AbstractTaskManagerProcessFailureRecoveryTest
Posted by se...@apache.org.
[hotfix] [tests] Harden timeout logic for TaskManager registration in AbstractTaskManagerProcessFailureRecoveryTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e76322c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e76322c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e76322c
Branch: refs/heads/master
Commit: 8e76322cb73a19b4dcb46ba0d1a2ee01d4aeaa9f
Parents: 973ce7d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 23 15:54:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100
----------------------------------------------------------------------
...ctTaskManagerProcessFailureRecoveryTest.java | 24 ++++++++++++--------
1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e76322c/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 0ff2e78..3acf5bb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -273,25 +273,25 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
- protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
+ protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
throws Exception
{
- final long deadline = System.currentTimeMillis() + maxDelay;
- while (true) {
- long remaining = deadline - System.currentTimeMillis();
- if (remaining <= 0) {
- fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
- }
+ final long interval = maxDelayMillis * 1_000_000;
+ final long deadline = System.nanoTime() + interval;
+ long remaining = interval;
- FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+ while (remaining > 0) {
+ FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.NANOSECONDS);
try {
Future<?> result = Patterns.ask(jobManager,
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
new Timeout(timeout));
- Integer numTMs = (Integer) Await.result(result, timeout);
+
+ int numTMs = (Integer) Await.result(result, timeout);
+
if (numTMs == numExpected) {
- break;
+ return;
}
}
catch (TimeoutException e) {
@@ -300,7 +300,11 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
catch (ClassCastException e) {
fail("Wrong response: " + e.getMessage());
}
+
+ remaining = deadline - System.nanoTime();
}
+
+ fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
}
protected static void printProcessLog(String processName, String log) {
[3/4] flink git commit: [hotfix] Flush in CsvOutputFormat before
closing, to increase CI stability
Posted by se...@apache.org.
[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d13a05d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d13a05d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d13a05d
Branch: refs/heads/master
Commit: 3d13a05d1b2354e027626db280f9bfce9070e570
Parents: 8e76322
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 23 15:37:05 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100
----------------------------------------------------------------------
.../flink/api/java/io/CsvOutputFormat.java | 1 +
.../flink/api/java/io/CsvOutputFormatTest.java | 47 +++++++++++++-------
2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 703128f..c2fe13c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -165,6 +165,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
@Override
public void close() throws IOException {
if (wrt != null) {
+ this.wrt.flush();
this.wrt.close();
}
super.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/3d13a05d/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a9288c6..a8ce495 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -34,25 +35,30 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
+import static org.junit.Assert.fail;
+
public class CsvOutputFormatTest {
private String path = null;
- private CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat;
@Before
public void createFile() throws Exception {
path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
- csvOutputFormat = new CsvOutputFormat<>(new Path(path));
}
@Test
public void testNullAllow() throws Exception {
- csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
- csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
- csvOutputFormat.setAllowNullValues(true);
- csvOutputFormat.open(0, 1);
- csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
- csvOutputFormat.close();
+ final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+ try {
+ csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+ csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+ csvOutputFormat.setAllowNullValues(true);
+ csvOutputFormat.open(0, 1);
+ csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+ }
+ finally {
+ csvOutputFormat.close();
+ }
java.nio.file.Path p = Paths.get(path);
Assert.assertTrue(Files.exists(p));
@@ -61,19 +67,28 @@ public class CsvOutputFormatTest {
Assert.assertEquals("One,,8", lines.get(0));
}
- @Test(expected = RuntimeException.class)
+ @Test
public void testNullDisallowOnDefault() throws Exception {
- csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
- csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
- csvOutputFormat.open(0, 1);
- csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
- csvOutputFormat.close();
+ final CsvOutputFormat<Tuple3<String, String, Integer>> csvOutputFormat = new CsvOutputFormat<>(new Path(path));
+ try {
+ csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+ csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY);
+ csvOutputFormat.open(0, 1);
+ try {
+ csvOutputFormat.writeRecord(new Tuple3<String, String, Integer>("One", null, 8));
+ fail("should fail with an exception");
+ } catch (RuntimeException e) {
+ // expected
+ }
+
+ }
+ finally {
+ csvOutputFormat.close();
+ }
}
@After
public void cleanUp() throws IOException {
- csvOutputFormat.close();
Files.deleteIfExists(Paths.get(path));
}
-
}
[4/4] flink git commit: [FLINK-5050] [build] Remove transitive
JSON.org dependency
Posted by se...@apache.org.
[FLINK-5050] [build] Remove transitive JSON.org dependency
This transitive dependency has an incompatible license.
This closes #2824
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/973ce7d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/973ce7d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/973ce7d0
Branch: refs/heads/master
Commit: 973ce7d0eb219ea84ad6b7fd0d063f595485205f
Parents: 652309c
Author: sergey_sokur <so...@gmail.com>
Authored: Thu Nov 17 19:28:20 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 28 14:20:08 2016 +0100
----------------------------------------------------------------------
flink-batch-connectors/flink-hcatalog/pom.xml | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/973ce7d0/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml
index 444bd9a..6889e5a 100644
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ b/flink-batch-connectors/flink-hcatalog/pom.xml
@@ -52,6 +52,12 @@ under the License.
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hcatalog-core</artifactId>
<version>0.12.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>