You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2019/02/13 15:14:56 UTC
[samza] branch master updated: SAMZA-2103: [samza-aws] code cleanup
and refactoring (#914)
This is an automated email from the ASF dual-hosted git repository.
jmakes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 509bee8 SAMZA-2103: [samza-aws] code cleanup and refactoring (#914)
509bee8 is described below
commit 509bee8668f69fa1cac1da1e1ebb30fad1b175a3
Author: Andrei Paikin <an...@gmail.com>
AuthorDate: Wed Feb 13 18:14:52 2019 +0300
SAMZA-2103: [samza-aws] code cleanup and refactoring (#914)
---
.../kinesis/consumer/KinesisRecordProcessor.java | 4 +--
.../kinesis/consumer/KinesisSystemConsumer.java | 4 +--
.../consumer/KinesisSystemConsumerOffset.java | 9 ++---
.../system/kinesis/consumer/SSPAllocator.java | 4 +--
.../descriptors/KinesisInputDescriptor.java | 14 ++++----
.../descriptors/KinesisSystemDescriptor.java | 18 +++++-----
.../consumer/TestKinesisRecordProcessor.java | 36 +++++++------------
.../consumer/TestKinesisSystemConsumer.java | 40 +++++++++-------------
8 files changed, 55 insertions(+), 74 deletions(-)
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
index 53ff27f..666ebde 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
@@ -63,7 +63,7 @@ public class KinesisRecordProcessor implements IRecordProcessor {
private final SystemStreamPartition ssp;
private String shardId;
- private KinesisRecordProcessorListener listener;
+ private final KinesisRecordProcessorListener listener;
private IRecordProcessorCheckpointer checkpointer;
private ExtendedSequenceNumber initSeqNumber;
@@ -85,7 +85,7 @@ public class KinesisRecordProcessor implements IRecordProcessor {
*/
@Override
public void initialize(InitializationInput initializationInput) {
- Validate.isTrue(listener != null, "There is no listener set for the processor.");
+ Validate.notNull(listener, "There is no listener set for the processor.");
initSeqNumber = initializationInput.getExtendedSequenceNumber();
shardId = initializationInput.getShardId();
LOG.info("Initialization done for {} with sequence {}", this,
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
index 6afffd3..b9808f7 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -212,8 +212,8 @@ public class KinesisSystemConsumer extends BlockingEnvelopeMap implements Checkp
}
@Override
- public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
- LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
+ public void afterCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
+ LOG.info("afterCheckpoint called with sspOffsets {}", sspOffsets);
sspOffsets.forEach((ssp, offset) -> {
KinesisRecordProcessor processor = processors.get(ssp);
KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
index 13296ca..541c19c 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
@@ -40,9 +40,9 @@ import org.codehaus.jackson.annotate.JsonProperty;
public class KinesisSystemConsumerOffset {
@JsonProperty("shardId")
- private String shardId;
+ private final String shardId;
@JsonProperty("seqNumber")
- private String seqNumber;
+ private final String seqNumber;
@JsonCreator
KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
@@ -91,10 +91,7 @@ public class KinesisSystemConsumerOffset {
return false;
}
String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
- if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
- return false;
- }
- return true;
+ return seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber);
}
@Override
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
index 4b7cff8..83b6b36 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
@@ -46,10 +46,10 @@ class SSPAllocator {
private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
- Validate.isTrue(availableSsps.get(stream) != null,
+ Validate.notNull(availableSsps.get(stream),
String.format("availableSsps is null for stream %s", stream));
- if (availableSsps.get(stream).size() <= 0) {
+ if (availableSsps.get(stream).isEmpty()) {
// Set a flag in system consumer so that it could throw an exception in the subsequent poll.
throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
+ " registered. Could be the result of dynamic resharding.", stream));
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
index 1c2e0a2..fcce49e 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
@@ -67,7 +67,7 @@ public class KinesisInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
- this.region = Optional.of(StringUtils.stripToNull(region));
+ this.region = Optional.ofNullable(StringUtils.stripToNull(region));
return this;
}
@@ -77,7 +77,7 @@ public class KinesisInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) {
- this.accessKey = Optional.of(StringUtils.stripToNull(accessKey));
+ this.accessKey = Optional.ofNullable(StringUtils.stripToNull(accessKey));
return this;
}
@@ -87,7 +87,7 @@ public class KinesisInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) {
- this.secretKey = Optional.of(StringUtils.stripToNull(secretKey));
+ this.secretKey = Optional.ofNullable(StringUtils.stripToNull(secretKey));
return this;
}
@@ -110,13 +110,13 @@ public class KinesisInputDescriptor<StreamMessageType>
String clientConfigPrefix =
String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
- this.region.ifPresent(
+ region.ifPresent(
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
- this.accessKey.ifPresent(
+ accessKey.ifPresent(
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
- this.secretKey.ifPresent(
+ secretKey.ifPresent(
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
- this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
+ kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
return config;
}
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
index ffeb667..678dfe6 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
@@ -72,7 +72,7 @@ public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescr
* @return this system descriptor
*/
public KinesisSystemDescriptor withRegion(String region) {
- this.region = Optional.of(StringUtils.stripToNull(region));
+ this.region = Optional.ofNullable(StringUtils.stripToNull(region));
return this;
}
@@ -102,7 +102,7 @@ public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescr
* @return this system descriptor
*/
public KinesisSystemDescriptor withProxyHost(String proxyHost) {
- this.proxyHost = Optional.of(StringUtils.stripToNull(proxyHost));
+ this.proxyHost = Optional.ofNullable(StringUtils.stripToNull(proxyHost));
return this;
}
@@ -121,18 +121,18 @@ public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescr
Map<String, String> config = new HashMap<>(super.toConfig());
String systemName = getSystemName();
- this.region.ifPresent(
+ region.ifPresent(
val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
- this.proxyHost.ifPresent(
+ proxyHost.ifPresent(
val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
- this.proxyPort.ifPresent(
+ proxyPort.ifPresent(
val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
- final String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
- this.kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
+ String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
+ kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
- final String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
- this.awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
+ String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
+ awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
return config;
}
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
index 6379fcc..6f1f052 100644
--- a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -56,20 +56,16 @@ public class TestKinesisRecordProcessor {
KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000;
@Test
- public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException,
- NoSuchFieldException, IllegalAccessException {
+ public void testLifeCycleWithEvents() {
testLifeCycleHelper(5);
}
@Test
- public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException,
- NoSuchFieldException, IllegalAccessException {
+ public void testLifeCycleWithNoEvents() {
testLifeCycleHelper(0);
}
- private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException,
- InvalidStateException, NoSuchFieldException,
- IllegalAccessException {
+ private void testLifeCycleHelper(int numRecords) {
String system = "kinesis";
String stream = "stream";
final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -102,7 +98,7 @@ public class TestKinesisRecordProcessor {
// Verification steps
// Verify there is a receivedRecords call to listener.
- Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+ Assert.assertEquals("Unable to receive records.", 0, receivedRecordsLatch.getCount());
if (numRecords > 0) {
// Call checkpoint on last record
@@ -114,7 +110,7 @@ public class TestKinesisRecordProcessor {
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
// Verify that the processor is shutdown.
- Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+ Assert.assertEquals("Unable to shutdown processor.", 0, receivedShutdownLatch.getCount());
}
/**
@@ -125,8 +121,7 @@ public class TestKinesisRecordProcessor {
* before it processed any records.
*/
@Test
- public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException,
- NoSuchFieldException, IllegalAccessException {
+ public void testCheckpointAfterInit() {
String system = "kinesis";
String stream = "stream";
final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -160,26 +155,21 @@ public class TestKinesisRecordProcessor {
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
// Verify that the processor is shutdown.
- Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+ Assert.assertEquals("Unable to shutdown processor.", 0, receivedShutdownLatch.getCount());
}
@Test
- public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException,
- InvalidStateException, NoSuchFieldException,
- IllegalAccessException {
+ public void testShutdownDuringReshardWithEvents() throws InterruptedException {
testShutdownDuringReshardHelper(5);
}
@Test
- public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException,
- InvalidStateException, NoSuchFieldException,
- IllegalAccessException {
+ public void testShutdownDuringReshardWithNoEvents() throws InterruptedException {
testShutdownDuringReshardHelper(0);
}
private void testShutdownDuringReshardHelper(int numRecords)
- throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException,
- IllegalAccessException {
+ throws InterruptedException {
String system = "kinesis";
String stream = "stream";
final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -212,7 +202,7 @@ public class TestKinesisRecordProcessor {
// Verification steps
// Verify there is a receivedRecords call to listener.
- Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+ Assert.assertEquals("Unable to receive records.", 0, receivedRecordsLatch.getCount());
// Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the
// listener until checkpoint is called for the last record consumed from shard.
@@ -240,7 +230,7 @@ public class TestKinesisRecordProcessor {
}
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
- List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
+ List<KinesisRecordProcessor> processors) {
Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
processors.forEach(processor -> {
try {
@@ -298,4 +288,4 @@ public class TestKinesisRecordProcessor {
}
return records;
}
-}
\ No newline at end of file
+}
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
index ade02ac..fe7fa96 100644
--- a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -65,8 +65,7 @@ public class TestKinesisSystemConsumer {
private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
@Test
- public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException,
- NoSuchFieldException, IllegalAccessException {
+ public void testProcessRecords() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
String system = "kinesis";
String stream = "stream";
int numShards = 2;
@@ -76,9 +75,7 @@ public class TestKinesisSystemConsumer {
}
@Test
- public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException,
- InvalidStateException, NoSuchFieldException,
- IllegalAccessException {
+ public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
String system = "kinesis";
String stream = "stream";
int numShards = 1;
@@ -96,8 +93,7 @@ public class TestKinesisSystemConsumer {
* 5. Shutting down (due to re-assignment or lease expiration) record processors.
*/
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
- throws InterruptedException, ShutdownException, InvalidStateException,
- NoSuchFieldException, IllegalAccessException {
+ throws InterruptedException, NoSuchFieldException, IllegalAccessException {
KinesisConfig kConfig = new KinesisConfig(new MapConfig());
// Create consumer
@@ -140,24 +136,22 @@ public class TestKinesisSystemConsumer {
try {
KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
- if (numRecordsPerShard > 0) {
- // Verify that the read messages are received in order and are the same as input records
- Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
- List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
- List<Record> inputRecords = inputRecordMap.get(processor);
- verifyRecords(envelopes, inputRecords, processor.getShardId());
-
- // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
- IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
- consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
- ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
- verify(getCheckpointer(processor)).checkpoint(argument.capture());
- Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
- }
+ // Verify that the read messages are received in order and are the same as input records
+ Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+ List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+ List<Record> inputRecords = inputRecordMap.get(processor);
+ verifyRecords(envelopes, inputRecords, processor.getShardId());
+
+ // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+ IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+ consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(getCheckpointer(processor)).checkpoint(argument.capture());
+ Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
// Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
- Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
+ Assert.assertFalse(sspToProcessorMap.containsValue(processor));
Assert.assertTrue(isSspAvailable(consumer, ssp));
} catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
throw new RuntimeException(ex);
@@ -221,7 +215,7 @@ public class TestKinesisSystemConsumer {
Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
record.getData().rewind();
- Assert.assertTrue(outputData.equals(record.getData()));
+ Assert.assertEquals(outputData, record.getData());
verifyOffset(envelope.getOffset(), record, shardId);
});
}