You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/23 16:48:39 UTC

[1/2] flink git commit: [hotfix] [tests] Code cleanups in ExecutionGraphRestartTest

Repository: flink
Updated Branches:
  refs/heads/master 605319b55 -> 02850545e


[hotfix] [tests] Code cleanups in ExecutionGraphRestartTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80ba4d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80ba4d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80ba4d6

Branch: refs/heads/master
Commit: d80ba4d6f12658c55b164339ec5930397fd455e3
Parents: 605319b
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 23 14:58:36 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 23 14:58:36 2017 +0200

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.java              | 39 +++++++++-----------
 1 file changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d80ba4d6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index b062369..7275e0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -63,11 +64,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Test;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -531,8 +529,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	 */
 	@Test
 	public void testSuspendWhileRestarting() throws Exception {
-		FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
-		Deadline deadline = timeout.fromNow();
+		final Time timeout = Time.of(1, TimeUnit.MINUTES);
 
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 			new ActorTaskManagerGateway(
@@ -571,7 +568,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		instance.markDead();
 
-		Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft());
+		controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 		assertEquals(JobStatus.RESTARTING, eg.getState());
 
@@ -581,7 +578,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		controllableRestartStrategy.unlockRestart();
 
-		Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft());
+		controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 	}
@@ -795,37 +792,37 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static class ControllableRestartStrategy implements RestartStrategy {
 
-		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
-		private Promise<Boolean> doRestart = new Promise.DefaultPromise<>();
-		private Promise<Boolean> restartDone = new Promise.DefaultPromise<>();
+		private final OneShotLatch reachedCanRestart = new OneShotLatch();
+		private final OneShotLatch doRestart = new OneShotLatch();
+		private final OneShotLatch restartDone = new OneShotLatch();
 
-		private volatile Exception exception = null;
+		private final Time timeout;
 
-		private FiniteDuration timeout;
+		private volatile Exception exception;
 
-		public ControllableRestartStrategy(FiniteDuration timeout) {
+		public ControllableRestartStrategy(Time timeout) {
 			this.timeout = timeout;
 		}
 
 		public void unlockRestart() {
-			doRestart.success(true);
+			doRestart.trigger();
 		}
 
 		public Exception getException() {
 			return exception;
 		}
 
-		public Future<Boolean> getReachedCanRestart() {
-			return reachedCanRestart.future();
+		public OneShotLatch getReachedCanRestart() {
+			return reachedCanRestart;
 		}
 
-		public Future<Boolean> getRestartDone() {
-			return restartDone.future();
+		public OneShotLatch getRestartDone() {
+			return restartDone;
 		}
 
 		@Override
 		public boolean canRestart() {
-			reachedCanRestart.success(true);
+			reachedCanRestart.trigger();
 			return true;
 		}
 
@@ -835,13 +832,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 				@Override
 				public void run() {
 					try {
-						Await.ready(doRestart.future(), timeout);
+						doRestart.await(timeout.getSize(), timeout.getUnit());
 						restarter.triggerFullRecovery();
 					} catch (Exception e) {
 						exception = e;
 					}
 
-					restartDone.success(true);
+					restartDone.trigger();
 				}
 			});
 		}


[2/2] flink git commit: [FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by se...@apache.org.
[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

This closes #4321


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02850545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02850545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02850545

Branch: refs/heads/master
Commit: 02850545e3143600c7265e737e278663e3264317
Parents: d80ba4d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 13 11:07:28 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 23 16:00:18 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  2 +-
 .../kafka/KafkaTestEnvironmentImpl.java         | 19 +++++++++---
 .../kafka/internal/KafkaConsumerThread.java     | 26 +++++++++++++++-
 .../kafka/internal/KafkaConsumerThreadTest.java | 32 +++++++++++++++++++-
 .../internals/AbstractPartitionDiscoverer.java  |  2 +-
 5 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 143cb7f..0ecaebc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<kafka.version>0.10.0.1</kafka.version>
+		<kafka.version>0.10.2.1</kafka.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 051d91e..f437060 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,9 +28,9 @@ import org.apache.flink.util.NetUtils;
 
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
@@ -40,8 +40,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +59,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import scala.collection.mutable.ArraySeq;
+
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -248,10 +252,17 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				if (secureMode) {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
 				} else {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
 				}
+				brokerConnectionString +=  ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -415,7 +426,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 			try {
 				scala.Option<String> stringNone = scala.Option.apply(null);
-				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
 				server.startup();
 				return server;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 65300cd..de8fb0b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -92,6 +92,9 @@ public class KafkaConsumerThread extends Thread {
 	/** This lock is used to isolate the consumer for partition reassignment. */
 	private final Object consumerReassignmentLock;
 
+	/** Indication if this consumer has any assigned partition. */
+	private boolean hasAssignedPartitions;
+
 	/**
 	 * Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map)} or {@link #shutdown()})
 	 * had attempted to wakeup the consumer while it was isolated for partition reassignment.
@@ -210,7 +213,16 @@ public class KafkaConsumerThread extends Thread {
 				}
 
 				try {
-					newPartitions = unassignedPartitionsQueue.pollBatch();
+					if (hasAssignedPartitions) {
+						newPartitions = unassignedPartitionsQueue.pollBatch();
+					}
+					else {
+						// if no assigned partitions block until we get at least one
+						// instead of hot spinning this loop. We rely on a fact that
+						// unassignedPartitionsQueue will be closed on a shutdown, so
+						// we don't block indefinitely
+						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
+					}
 					if (newPartitions != null) {
 						reassignPartitions(newPartitions);
 					}
@@ -218,6 +230,11 @@ public class KafkaConsumerThread extends Thread {
 					continue;
 				}
 
+				if (!hasAssignedPartitions) {
+					// Without assigned partitions KafkaConsumer.poll will throw an exception
+					continue;
+				}
+
 				// get the next batch of records, unless we did not manage to hand the old batch over
 				if (records == null) {
 					try {
@@ -264,6 +281,9 @@ public class KafkaConsumerThread extends Thread {
 	public void shutdown() {
 		running = false;
 
+		// wake up all blocking calls on the queue
+		unassignedPartitionsQueue.close();
+
 		// We cannot call close() on the KafkaConsumer, because it will actually throw
 		// an exception if a concurrent call is in progress
 
@@ -335,6 +355,10 @@ public class KafkaConsumerThread extends Thread {
 	 */
 	@VisibleForTesting
 	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
+		if (newPartitions.size() == 0) {
+			return;
+		}
+		hasAssignedPartitions = true;
 		boolean reassignmentStarted = false;
 
 		// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 091cd71..3a2f84a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -61,6 +61,36 @@ import static org.mockito.Mockito.when;
  */
 public class KafkaConsumerThreadTest {
 
+	@Test(timeout = 10000)
+	public void testCloseWithoutAssignedPartitions() throws Exception {
+		// no initial assignment
+		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
+			new LinkedHashMap<TopicPartition, Long>(),
+			Collections.<TopicPartition, Long>emptyMap(),
+			false,
+			null,
+			null);
+
+		// setup latch so the test waits until testThread is blocked on getBatchBlocking method
+		final MultiShotLatch getBatchBlockingInvoked = new MultiShotLatch();
+		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
+			new ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>() {
+				@Override
+				public List<KafkaTopicPartitionState<TopicPartition>> getBatchBlocking() throws InterruptedException {
+					getBatchBlockingInvoked.trigger();
+					return super.getBatchBlocking();
+				}
+			};
+
+		final TestKafkaConsumerThread testThread =
+			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
+
+		testThread.start();
+		getBatchBlockingInvoked.await();
+		testThread.shutdown();
+		testThread.join();
+	}
+
 	/**
 	 * Tests reassignment works correctly in the case when:
 	 *  - the consumer initially had no assignments
@@ -744,6 +774,7 @@ public class KafkaConsumerThreadTest {
 			final OneShotLatch continueAssignmentLatch) {
 
 		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
 		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -754,7 +785,6 @@ public class KafkaConsumerThreadTest {
 				if (continueAssignmentLatch != null) {
 					continueAssignmentLatch.await();
 				}
-
 				return mockConsumerAssignmentAndPosition.keySet();
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index d55099a..725092e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -196,7 +196,7 @@ public abstract class AbstractPartitionDiscoverer {
 	 */
 	public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
 		if (isUndiscoveredPartition(partition)) {
-				topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
+			topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
 
 			return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
 		}