You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:43 UTC
[13/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-kinesis
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index b22ba0c..a194835 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,17 +17,18 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang.StringUtils;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.mockito.Mockito;
@@ -39,6 +40,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the {@link ShardConsumer}.
+ */
public class ShardConsumerTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 63c6c2b..2915e2f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -37,12 +38,12 @@ import java.util.Properties;
/**
* This is a manual test for the AWS Kinesis connector in Flink.
*
- * It uses:
+ * <p>It uses:
* - A custom KinesisSerializationSchema
* - A custom KinesisPartitioner
*
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
*/
public class ManualConsumerProducerTest {
@@ -69,7 +70,7 @@ public class ManualConsumerProducerTest {
// every 10th element goes into a different stream
@Override
public String getTargetStream(String element) {
- if(element.split("-")[0].endsWith("0")) {
+ if (element.split("-")[0].endsWith("0")) {
return "flink-test-2";
}
return null; // send to default stream
@@ -90,7 +91,6 @@ public class ManualConsumerProducerTest {
});
simpleStringStream.addSink(kinesis);
-
// consuming topology
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
@@ -104,13 +104,13 @@ public class ManualConsumerProducerTest {
String[] parts = value.split("-");
try {
long l = Long.parseLong(parts[0]);
- if(l < 0) {
+ if (l < 0) {
throw new RuntimeException("Negative");
}
- } catch(NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
throw new RuntimeException("First part of '" + value + "' is not a valid numeric type");
}
- if(parts[1].length() != 12) {
+ if (parts[1].length() != 12) {
throw new RuntimeException("Second part of '" + value + "' doesn't have 12 characters");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 2e452c1..7abcd3c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kinesis.manualtests;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -26,6 +25,9 @@ import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +40,8 @@ import java.util.concurrent.atomic.AtomicReference;
* Then, it starts a consuming topology, ensuring that all records up to a certain
* point have been seen.
*
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
*/
public class ManualExactlyOnceTest {
@@ -67,8 +69,8 @@ public class ManualExactlyOnceTest {
// wait until stream has been created
DescribeStreamResult status = client.describeStream(streamName);
- LOG.info("status {}" ,status);
- while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+ LOG.info("status {}" , status);
+ while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
status = client.describeStream(streamName);
LOG.info("Status of stream {}", status);
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 71bcae3..226ac3e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -17,13 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis.manualtests;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.LimitExceededException;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
-import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
-import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -32,6 +25,14 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,8 @@ import java.util.concurrent.atomic.AtomicReference;
* point have been seen. While the data generator and consuming topology is running,
* the kinesis stream is resharded two times.
*
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
*/
public class ManualExactlyOnceWithStreamReshardingTest {
@@ -80,7 +81,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
// wait until stream has been created
DescribeStreamResult status = client.describeStream(streamName);
LOG.info("status {}", status);
- while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+ while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
status = client.describeStream(streamName);
LOG.info("Status of stream {}", status);
Thread.sleep(1000);
@@ -113,7 +114,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
Thread.sleep(10);
Set<PutRecordsRequestEntry> batch = new HashSet<>();
- for (int i=count; i<count+batchSize; i++) {
+ for (int i = count; i < count + batchSize; i++) {
if (i >= TOTAL_EVENT_COUNT) {
break;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 81d0bec..8abf4bb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -32,14 +33,14 @@ import java.util.Properties;
/**
* This is a manual test for the AWS Kinesis connector in Flink.
*
- * It uses:
+ * <p>It uses:
* - A custom KinesisSerializationSchema
* - A custom KinesisPartitioner
*
- * The streams "test-flink" and "flink-test-2" must exist.
+ * <p>The streams "test-flink" and "flink-test-2" must exist.
*
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
*/
public class ManualProducerTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 86202c5..7ca05d7 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -17,15 +17,14 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import org.junit.Test;
-
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Test for methods in the {@link KinesisProxy} class.
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
index 157964c..75356ef 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ import static org.apache.flink.test.util.TestUtils.tryExecute;
/**
* A thread that runs a topology with the FlinkKinesisConsumer as source, followed by two flat map
- * functions, one that performs artificial failures and another that validates exactly-once guarantee
+ * functions, one that performs artificial failures and another that validates exactly-once guarantee.
*/
public class ExactlyOnceValidatingConsumerThread {
@@ -94,7 +95,7 @@ public class ExactlyOnceValidatingConsumerThread {
return new Thread(exactlyOnceValidationConsumer);
}
- private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String,String>, Checkpointed<BitSet> {
+ private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String, String>, Checkpointed<BitSet> {
private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
@@ -111,15 +112,15 @@ public class ExactlyOnceValidatingConsumerThread {
LOG.info("Consumed {}", value);
int id = Integer.parseInt(value.split("-")[0]);
- if(validator.get(id)) {
- throw new RuntimeException("Saw id " + id +" twice!");
+ if (validator.get(id)) {
+ throw new RuntimeException("Saw id " + id + " twice!");
}
validator.set(id);
- if(id > totalEventCount-1) {
+ if (id > totalEventCount - 1) {
throw new RuntimeException("Out of bounds ID observed");
}
- if(validator.nextClearBit(0) == totalEventCount) {
+ if (validator.nextClearBit(0) == totalEventCount) {
throw new SuccessException();
}
}
@@ -135,7 +136,7 @@ public class ExactlyOnceValidatingConsumerThread {
}
}
- private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String,String> {
+ private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String, String> {
int count = 0;
private final int failAtRecordCount;
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index ce5a0de..2fda0d5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -17,15 +17,16 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
@@ -67,7 +68,7 @@ public class FakeKinesisBehavioursFactory {
}
- public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String,Integer> streamsToShardCount) {
+ public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String, Integer> streamsToShardCount) {
return new NonReshardedStreamsKinesis(streamsToShardCount);
}
@@ -79,14 +80,14 @@ public class FakeKinesisBehavioursFactory {
public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int numOfGetRecordsCalls) {
return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
}
-
+
public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
final int numOfRecords, final int numOfGetRecordsCall, final int orderOfCallToExpire) {
return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
}
- public static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
+ private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
private boolean expiredOnceAlready = false;
private boolean expiredIteratorRefreshed = false;
@@ -103,7 +104,7 @@ public class FakeKinesisBehavioursFactory {
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
- if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
+ if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) {
// we fake only once the expired iterator exception at the specified get records attempt order
expiredOnceAlready = true;
throw new ExpiredIteratorException("Artificial expired shard iterator");
@@ -130,7 +131,7 @@ public class FakeKinesisBehavioursFactory {
// fake the iterator refresh when this is called again after getRecords throws expired iterator
// exception on the orderOfCallToExpire attempt
expiredIteratorRefreshed = true;
- return String.valueOf(orderOfCallToExpire-1);
+ return String.valueOf(orderOfCallToExpire - 1);
}
}
}
@@ -141,7 +142,7 @@ public class FakeKinesisBehavioursFactory {
protected final int totalNumOfRecords;
- protected final Map<String,List<Record>> shardItrToRecordBatch;
+ protected final Map<String, List<Record>> shardItrToRecordBatch;
public SingleShardEmittingFixNumOfRecordsKinesis(final int numOfRecords, final int numOfGetRecordsCalls) {
this.totalNumOfRecords = numOfRecords;
@@ -151,9 +152,9 @@ public class FakeKinesisBehavioursFactory {
this.shardItrToRecordBatch = new HashMap<>();
int numOfAlreadyPartitionedRecords = 0;
- int numOfRecordsPerBatch = numOfRecords/numOfGetRecordsCalls + 1;
- for (int batch=0; batch<totalNumOfGetRecordsCalls; batch++) {
- if (batch != totalNumOfGetRecordsCalls-1) {
+ int numOfRecordsPerBatch = numOfRecords / numOfGetRecordsCalls + 1;
+ for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) {
+ if (batch != totalNumOfGetRecordsCalls - 1) {
shardItrToRecordBatch.put(
String.valueOf(batch),
createRecordBatchWithRange(
@@ -176,8 +177,8 @@ public class FakeKinesisBehavioursFactory {
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withNextShardIterator(
- (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls-1)
- ? null : String.valueOf(Integer.valueOf(shardIterator)+1)); // last next shard iterator is null
+ (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+ ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
}
@Override
@@ -211,8 +212,8 @@ public class FakeKinesisBehavioursFactory {
private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
- public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) {
- for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet()) {
+ public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) {
+ for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) {
String streamName = streamToShardCount.getKey();
int shardCount = streamToShardCount.getValue();
@@ -220,7 +221,7 @@ public class FakeKinesisBehavioursFactory {
// don't do anything
} else {
List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
- for (int i=0; i < shardCount; i++) {
+ for (int i = 0; i < shardCount; i++) {
shardsOfStream.add(
new StreamShardHandle(
streamName,
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
index fdfdfe1..699c977 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -25,6 +24,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,12 +98,12 @@ public class KinesisEventsGeneratorProducerThread {
@Override
public void run(SourceContext<String> ctx) throws Exception {
long seq = 0;
- while(running) {
+ while (running) {
Thread.sleep(10);
String evt = (seq++) + "-" + RandomStringUtils.randomAlphabetic(12);
ctx.collect(evt);
LOG.info("Emitting event {}", evt);
- if(seq >= limit) {
+ if (seq >= limit) {
break;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
index c8dd347..0377a66 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
@@ -17,8 +17,12 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
+/**
+ * A generator for Kinesis shard IDs.
+ *
+ * <p>Kinesis shard ids are in the form of: shardId-\d{12}
+ */
public class KinesisShardIdGenerator {
- // Kinesis shards ids are in the form of: ^shardId-\d{12}
public static String generateFromShardOrder(int order) {
return String.format("shardId-%012d", order);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
index 80ad06c..6c91eaf 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -20,12 +20,16 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Properties;
+/**
+ * Extension of the {@link FlinkKinesisConsumer} for testing.
+ */
public class TestableFlinkKinesisConsumer extends FlinkKinesisConsumer<String> {
private final RuntimeContext mockedRuntimeCtx;
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index bb644ba..b6f3cbc 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -37,6 +38,9 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
+/**
+ * Extension of the {@link KinesisDataFetcher} for testing.
+ */
public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> {
private static final Object fakeCheckpointLock = new Object();
@@ -45,14 +49,15 @@ public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> {
private OneShotLatch runWaiter;
- public TestableKinesisDataFetcher(List<String> fakeStreams,
- Properties fakeConfiguration,
- int fakeTotalCountOfSubtasks,
- int fakeTndexOfThisSubtask,
- AtomicReference<Throwable> thrownErrorUnderTest,
- LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
- HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
- KinesisProxyInterface fakeKinesis) {
+ public TestableKinesisDataFetcher(
+ List<String> fakeStreams,
+ Properties fakeConfiguration,
+ int fakeTotalCountOfSubtasks,
+ int fakeTndexOfThisSubtask,
+ AtomicReference<Throwable> thrownErrorUnderTest,
+ LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
+ HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+ KinesisProxyInterface fakeKinesis) {
super(fakeStreams,
getMockedSourceContext(),
fakeCheckpointLock,