You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/23 16:48:39 UTC
[1/2] flink git commit: [hotfix] [tests] Code cleanups in
ExecutionGraphRestartTest
Repository: flink
Updated Branches:
refs/heads/master 605319b55 -> 02850545e
[hotfix] [tests] Code cleanups in ExecutionGraphRestartTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80ba4d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80ba4d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80ba4d6
Branch: refs/heads/master
Commit: d80ba4d6f12658c55b164339ec5930397fd455e3
Parents: 605319b
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 23 14:58:36 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 23 14:58:36 2017 +0200
----------------------------------------------------------------------
.../ExecutionGraphRestartTest.java | 39 +++++++++-----------
1 file changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d80ba4d6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index b062369..7275e0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -63,11 +64,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
import java.io.IOException;
import java.net.InetAddress;
@@ -531,8 +529,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
*/
@Test
public void testSuspendWhileRestarting() throws Exception {
- FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
- Deadline deadline = timeout.fromNow();
+ final Time timeout = Time.of(1, TimeUnit.MINUTES);
Instance instance = ExecutionGraphTestUtils.getInstance(
new ActorTaskManagerGateway(
@@ -571,7 +568,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
instance.markDead();
- Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft());
+ controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RESTARTING, eg.getState());
@@ -581,7 +578,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
controllableRestartStrategy.unlockRestart();
- Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft());
+ controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.SUSPENDED, eg.getState());
}
@@ -795,37 +792,37 @@ public class ExecutionGraphRestartTest extends TestLogger {
private static class ControllableRestartStrategy implements RestartStrategy {
- private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
- private Promise<Boolean> doRestart = new Promise.DefaultPromise<>();
- private Promise<Boolean> restartDone = new Promise.DefaultPromise<>();
+ private final OneShotLatch reachedCanRestart = new OneShotLatch();
+ private final OneShotLatch doRestart = new OneShotLatch();
+ private final OneShotLatch restartDone = new OneShotLatch();
- private volatile Exception exception = null;
+ private final Time timeout;
- private FiniteDuration timeout;
+ private volatile Exception exception;
- public ControllableRestartStrategy(FiniteDuration timeout) {
+ public ControllableRestartStrategy(Time timeout) {
this.timeout = timeout;
}
public void unlockRestart() {
- doRestart.success(true);
+ doRestart.trigger();
}
public Exception getException() {
return exception;
}
- public Future<Boolean> getReachedCanRestart() {
- return reachedCanRestart.future();
+ public OneShotLatch getReachedCanRestart() {
+ return reachedCanRestart;
}
- public Future<Boolean> getRestartDone() {
- return restartDone.future();
+ public OneShotLatch getRestartDone() {
+ return restartDone;
}
@Override
public boolean canRestart() {
- reachedCanRestart.success(true);
+ reachedCanRestart.trigger();
return true;
}
@@ -835,13 +832,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Override
public void run() {
try {
- Await.ready(doRestart.future(), timeout);
+ doRestart.await(timeout.getSize(), timeout.getUnit());
restarter.triggerFullRecovery();
} catch (Exception e) {
exception = e;
}
- restartDone.success(true);
+ restartDone.trigger();
}
});
}
[2/2] flink git commit: [FLINK-7174] [kafka connector] Bump Kafka
0.10 dependency to 0.10.2.1
Posted by se...@apache.org.
[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1
This closes #4321
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02850545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02850545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02850545
Branch: refs/heads/master
Commit: 02850545e3143600c7265e737e278663e3264317
Parents: d80ba4d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 13 11:07:28 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jul 23 16:00:18 2017 +0200
----------------------------------------------------------------------
.../flink-connector-kafka-0.10/pom.xml | 2 +-
.../kafka/KafkaTestEnvironmentImpl.java | 19 +++++++++---
.../kafka/internal/KafkaConsumerThread.java | 26 +++++++++++++++-
.../kafka/internal/KafkaConsumerThreadTest.java | 32 +++++++++++++++++++-
.../internals/AbstractPartitionDiscoverer.java | 2 +-
5 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 143cb7f..0ecaebc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
- <kafka.version>0.10.0.1</kafka.version>
+ <kafka.version>0.10.2.1</kafka.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 051d91e..f437060 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,9 +28,9 @@ import org.apache.flink.util.NetUtils;
import kafka.admin.AdminUtils;
import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.list.UnmodifiableList;
@@ -40,8 +40,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +59,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import scala.collection.mutable.ArraySeq;
+
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -248,10 +252,17 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
if (secureMode) {
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+ brokerConnectionString += hostAndPortToUrlString(
+ KafkaTestEnvironment.KAFKA_HOST,
+ brokers.get(i).socketServer().boundPort(
+ ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
} else {
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ brokerConnectionString += hostAndPortToUrlString(
+ KafkaTestEnvironment.KAFKA_HOST,
+ brokers.get(i).socketServer().boundPort(
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
}
+ brokerConnectionString += ",";
}
LOG.info("ZK and KafkaServer started.");
@@ -415,7 +426,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
try {
scala.Option<String> stringNone = scala.Option.apply(null);
- KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+ KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
server.startup();
return server;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 65300cd..de8fb0b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -92,6 +92,9 @@ public class KafkaConsumerThread extends Thread {
/** This lock is used to isolate the consumer for partition reassignment. */
private final Object consumerReassignmentLock;
+ /** Indication if this consumer has any assigned partition. */
+ private boolean hasAssignedPartitions;
+
/**
* Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map)} or {@link #shutdown()})
* had attempted to wakeup the consumer while it was isolated for partition reassignment.
@@ -210,7 +213,16 @@ public class KafkaConsumerThread extends Thread {
}
try {
- newPartitions = unassignedPartitionsQueue.pollBatch();
+ if (hasAssignedPartitions) {
+ newPartitions = unassignedPartitionsQueue.pollBatch();
+ }
+ else {
+ // if no assigned partitions block until we get at least one
+ // instead of hot spinning this loop. We rely on a fact that
+ // unassignedPartitionsQueue will be closed on a shutdown, so
+ // we don't block indefinitely
+ newPartitions = unassignedPartitionsQueue.getBatchBlocking();
+ }
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
@@ -218,6 +230,11 @@ public class KafkaConsumerThread extends Thread {
continue;
}
+ if (!hasAssignedPartitions) {
+ // Without assigned partitions KafkaConsumer.poll will throw an exception
+ continue;
+ }
+
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
@@ -264,6 +281,9 @@ public class KafkaConsumerThread extends Thread {
public void shutdown() {
running = false;
+ // wake up all blocking calls on the queue
+ unassignedPartitionsQueue.close();
+
// We cannot call close() on the KafkaConsumer, because it will actually throw
// an exception if a concurrent call is in progress
@@ -335,6 +355,10 @@ public class KafkaConsumerThread extends Thread {
*/
@VisibleForTesting
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
+ if (newPartitions.size() == 0) {
+ return;
+ }
+ hasAssignedPartitions = true;
boolean reassignmentStarted = false;
// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,
http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 091cd71..3a2f84a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -61,6 +61,36 @@ import static org.mockito.Mockito.when;
*/
public class KafkaConsumerThreadTest {
+ @Test(timeout = 10000)
+ public void testCloseWithoutAssignedPartitions() throws Exception {
+ // no initial assignment
+ final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
+ new LinkedHashMap<TopicPartition, Long>(),
+ Collections.<TopicPartition, Long>emptyMap(),
+ false,
+ null,
+ null);
+
+ // setup latch so the test waits until testThread is blocked on getBatchBlocking method
+ final MultiShotLatch getBatchBlockingInvoked = new MultiShotLatch();
+ final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue =
+ new ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>() {
+ @Override
+ public List<KafkaTopicPartitionState<TopicPartition>> getBatchBlocking() throws InterruptedException {
+ getBatchBlockingInvoked.trigger();
+ return super.getBatchBlocking();
+ }
+ };
+
+ final TestKafkaConsumerThread testThread =
+ new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
+
+ testThread.start();
+ getBatchBlockingInvoked.await();
+ testThread.shutdown();
+ testThread.join();
+ }
+
/**
* Tests reassignment works correctly in the case when:
* - the consumer initially had no assignments
@@ -744,6 +774,7 @@ public class KafkaConsumerThreadTest {
final OneShotLatch continueAssignmentLatch) {
final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -754,7 +785,6 @@ public class KafkaConsumerThreadTest {
if (continueAssignmentLatch != null) {
continueAssignmentLatch.await();
}
-
return mockConsumerAssignmentAndPosition.keySet();
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index d55099a..725092e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -196,7 +196,7 @@ public abstract class AbstractPartitionDiscoverer {
*/
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
- topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
+ topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
}