You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/03/28 06:12:24 UTC

[1/5] flink git commit: [hotfix][tests] add a name to the parameter of RescalingITCase

Repository: flink
Updated Branches:
  refs/heads/release-1.5 9b12dcdc3 -> dcec0820d


[hotfix][tests] add a name to the parameter of RescalingITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8dec5bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8dec5bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8dec5bb

Branch: refs/heads/release-1.5
Commit: c8dec5bb24e715ae7f965fa7fc1eea44849d1e0a
Parents: 9b12dcd
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Mar 22 13:49:45 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Mar 28 08:12:04 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/test/checkpointing/RescalingITCase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8dec5bb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index e4f4389..55631a2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -90,7 +90,7 @@ public class RescalingITCase extends TestLogger {
 	private static final int slotsPerTaskManager = 2;
 	private static final int numSlots = numTaskManagers * slotsPerTaskManager;
 
-	@Parameterized.Parameters
+	@Parameterized.Parameters(name = "backend = {0}")
 	public static Object[] data() {
 		return new Object[]{"filesystem", "rocksdb"};
 	}


[3/5] flink git commit: [FLINK-9057][network] fix an NPE when cleaning up before requesting a subpartition view

Posted by tr...@apache.org.
[FLINK-9057][network] fix an NPE when cleaning up before requesting a subpartition view

In PartitionRequestServerHandler, the view reader was created and immediately
afterwards added to the PartitionRequestQueue which would attempt a cleanup of
the view reader's subpartition view. This view, however, was currently only
created after adding the reader to the PartitionRequestQueue and may thus result
in a NullPointerException if the cleanup happens very early in the
initialization phase, e.g. due to failures.

This closes #5747.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1df3108
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1df3108
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1df3108

Branch: refs/heads/release-1.5
Commit: f1df310814b6a17c339e91c652d6cb6efee977fc
Parents: c8dec5b
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Mar 22 13:50:07 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Mar 28 08:12:05 2018 +0200

----------------------------------------------------------------------
 .../io/network/netty/PartitionRequestServerHandler.java        | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1df3108/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index c6a8b1a..e9ee10c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
-import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -99,12 +99,12 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 							outboundQueue);
 					}
 
-					outboundQueue.notifyReaderCreated(reader);
-
 					reader.requestSubpartitionView(
 						partitionProvider,
 						request.partitionId,
 						request.queueIndex);
+
+					outboundQueue.notifyReaderCreated(reader);
 				} catch (PartitionNotFoundException notFound) {
 					respondWithError(ctx, notFound, request.receiverId);
 				}


[2/5] flink git commit: [FLINK-8941][tests] use TestLogger and TemporaryFolder in SpanningRecordSerializationTest

Posted by tr...@apache.org.
[FLINK-8941][tests] use TestLogger and TemporaryFolder in SpanningRecordSerializationTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a060d41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a060d41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a060d41

Branch: refs/heads/release-1.5
Commit: 7a060d4191729b2c6cd1d0c8ba6044d22618b349
Parents: f1df310
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Mar 16 11:43:43 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Mar 28 08:12:05 2018 +0200

----------------------------------------------------------------------
 .../serialization/SpanningRecordSerializationTest.java  | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a060d41/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 16b77e6..2e1063f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -27,9 +27,12 @@ import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -43,9 +46,12 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 /**
  * Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer}.
  */
-public class SpanningRecordSerializationTest {
+public class SpanningRecordSerializationTest extends TestLogger {
 	private static final Random RANDOM = new Random(42);
 
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
 	@Test
 	public void testIntRecordsSpanningMultipleSegments() throws Exception {
 		final int segmentSize = 1;
@@ -100,11 +106,11 @@ public class SpanningRecordSerializationTest {
 
 	// -----------------------------------------------------------------------------------------------------------------
 
-	private static void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception {
+	private void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception {
 		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
-				new String[]{System.getProperty("java.io.tmpdir")});
+				new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
 		testSerializationRoundTrip(records, segmentSize, serializer, deserializer);
 	}


[5/5] flink git commit: [FLINK-8415] Unprotected access to recordsToSend in LongRecordWriterThread#shutdown()

Posted by tr...@apache.org.
[FLINK-8415] Unprotected access to recordsToSend in LongRecordWriterThread#shutdown()

This closes #5724.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcec0820
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcec0820
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcec0820

Branch: refs/heads/release-1.5
Commit: dcec0820d56670b302835331f63efc81e537dc9e
Parents: 1bb9567
Author: yanghua <ya...@gmail.com>
Authored: Tue Mar 20 11:21:42 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Mar 28 08:12:06 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/io/benchmark/LongRecordWriterThread.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcec0820/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
index b93b867..05ae276 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -47,7 +47,7 @@ public class LongRecordWriterThread extends CheckedThread {
 		this.recordWriter = checkNotNull(recordWriter);
 	}
 
-	public void shutdown() {
+	public synchronized void shutdown() {
 		running = false;
 		recordsToSend.complete(0L);
 	}


[4/5] flink git commit: [FLINK-8941][serialization] make sure we use unique spilling files

Posted by tr...@apache.org.
[FLINK-8941][serialization] make sure we use unique spilling files

Although the spilling files were chosen with random names of 20 bytes, it could
rarely happen that these collide. In that case, have another try (at most 10) at
selecting a unique file name.

This closes #5709.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1bb9567f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1bb9567f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1bb9567f

Branch: refs/heads/release-1.5
Commit: 1bb9567f8ce53a7c4e03a2abc74989da7f1490bf
Parents: 7a060d4
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Mar 16 11:51:29 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Mar 28 08:12:05 2018 +0200

----------------------------------------------------------------------
 .../SpillingAdaptiveSpanningRecordDeserializer.java | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1bb9567f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index fded258..41ee03d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -36,6 +36,7 @@ import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
 import java.util.Random;
 
 /**
@@ -627,10 +628,19 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				throw new IllegalStateException("Spilling file already exists.");
 			}
 
-			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
-			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
+			// try to find a unique file name for the spilling channel
+			int maxAttempts = 10;
+			for (int attempt = 0; attempt < maxAttempts; attempt++) {
+				String directory = tempDirs[rnd.nextInt(tempDirs.length)];
+				spillFile = new File(directory, randomString(rnd) + ".inputchannel");
+				if (spillFile.createNewFile()) {
+					return new RandomAccessFile(spillFile, "rw").getChannel();
+				}
+			}
 
-			return new RandomAccessFile(spillFile, "rw").getChannel();
+			throw new IOException(
+				"Could not find a unique file channel name in '" + Arrays.toString(tempDirs) +
+					"' for spilling large records during deserialization.");
 		}
 
 		private static String randomString(Random random) {