You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/28 21:12:14 UTC
[1/2] samza git commit: SAMZA-1515; Implement a consumer for Kinesis
Repository: samza
Updated Branches:
refs/heads/master 5e68d621a -> 9961023f7
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
new file mode 100644
index 0000000..5b3b335
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+public class TestKinesisSystemFactory {
+ private static final String SYSTEM_FACTORY_REGEX = "systems.%s.samza.factory";
+ private static final String KINESIS_SYSTEM_FACTORY = KinesisSystemFactory.class.getName();
+
+ @Test
+ public void testGetConsumer() {
+ String systemName = "test";
+ Config config = buildKinesisConsumerConfig(systemName);
+ KinesisSystemFactory factory = new KinesisSystemFactory();
+ MetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+ Assert.assertNotSame(factory.getConsumer("test", config, metricsRegistry), factory.getAdmin(systemName, config));
+ }
+
+ @Ignore
+ @Test(expected = ConfigException.class)
+ public void testGetAdminWithIncorrectSspGrouper() {
+ String systemName = "test";
+ KinesisSystemFactory factory = new KinesisSystemFactory();
+ Config config = buildKinesisConsumerConfig(systemName,
+ "org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory");
+ factory.getAdmin(systemName, config);
+ }
+
+ @Ignore
+ @Test(expected = ConfigException.class)
+ public void testGetAdminWithBroadcastStreams() {
+ String systemName = "test";
+ KinesisSystemFactory factory = new KinesisSystemFactory();
+ Config config = buildKinesisConsumerConfig(systemName,
+ "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+ "test.stream#0");
+ factory.getAdmin(systemName, config);
+ }
+
+ @Ignore
+ @Test(expected = ConfigException.class)
+ public void testGetAdminWithBootstrapStream() {
+ String systemName = "test";
+ KinesisSystemFactory factory = new KinesisSystemFactory();
+ Config config = buildKinesisConsumerConfig(systemName,
+ "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+ null,
+ "kinesis-stream"
+ );
+ factory.getAdmin(systemName, config);
+ }
+
+ private static Config buildKinesisConsumerConfig(String systemName) {
+ return buildKinesisConsumerConfig(systemName, AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+ }
+
+ private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory) {
+ return buildKinesisConsumerConfig(systemName, sspGrouperFactory, null);
+ }
+
+ private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory,
+ String broadcastStreamConfigValue) {
+ return buildKinesisConsumerConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue, null);
+ }
+
+ private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory,
+ String broadcastStreamConfigValue, String bootstrapStreamName) {
+ Map<String, String> props = buildSamzaKinesisSystemConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue,
+ bootstrapStreamName);
+ return new MapConfig(props);
+ }
+
+ private static Map<String, String> buildSamzaKinesisSystemConfig(String systemName, String sspGrouperFactory,
+ String broadcastStreamConfigValue, String bootstrapStreamName) {
+ Map<String, String> result = new HashMap<>();
+ result.put(String.format(SYSTEM_FACTORY_REGEX, systemName), KINESIS_SYSTEM_FACTORY);
+ result.put("job.systemstreampartition.grouper.factory", sspGrouperFactory);
+ if (broadcastStreamConfigValue != null && !broadcastStreamConfigValue.isEmpty()) {
+ result.put("task.broadcast.inputs", broadcastStreamConfigValue);
+ }
+ if (bootstrapStreamName != null && !bootstrapStreamName.isEmpty()) {
+ result.put("systems." + systemName + ".streams." + bootstrapStreamName + ".samza.bootstrap", "true");
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
new file mode 100644
index 0000000..6379fcc
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestKinesisRecordProcessor {
+ private static final long MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS =
+ KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000;
+
+ @Test
+ public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException,
+ NoSuchFieldException, IllegalAccessException {
+ testLifeCycleHelper(5);
+ }
+
+ @Test
+ public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException,
+ NoSuchFieldException, IllegalAccessException {
+ testLifeCycleHelper(0);
+ }
+
+ private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException,
+ InvalidStateException, NoSuchFieldException,
+ IllegalAccessException {
+ String system = "kinesis";
+ String stream = "stream";
+ final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+ final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0);
+
+ KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+ @Override
+ public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+ receivedRecordsLatch.countDown();
+ }
+
+ @Override
+ public void onShutdown(SystemStreamPartition ssp) {
+ receivedShutdownLatch.countDown();
+ }
+ };
+
+ KinesisRecordProcessor processor =
+ new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+ // Initialize the processor
+ ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+ InitializationInput initializationInput =
+ new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+ processor.initialize(initializationInput);
+
+ // Call processRecords on the processor
+ List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor);
+
+ // Verification steps
+
+ // Verify there is a receivedRecords call to listener.
+ Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+
+ if (numRecords > 0) {
+ // Call checkpoint on last record
+ processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+ }
+
+
+ // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener.
+ shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+ // Verify that the processor is shutdown.
+ Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+ }
+
+ /**
+ * Test the scenario where a processor instance is created for a shard and while it is processing records, it got
+ * re-assigned to the same consumer. This results in a new processor instance owning the shard and this instance
+ * could receive checkpoint calls for the records that are processed by the old processor instance. This test covers
+ * the scenario where the new instance receives the checkpoint call while it is done with the initialization phase and
+ * before it processed any records.
+ */
+ @Test
+ public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException,
+ NoSuchFieldException, IllegalAccessException {
+ String system = "kinesis";
+ String stream = "stream";
+ final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+
+ KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+ @Override
+ public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+ }
+
+ @Override
+ public void onShutdown(SystemStreamPartition ssp) {
+ receivedShutdownLatch.countDown();
+ }
+ };
+
+ KinesisRecordProcessor processor =
+ new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+ // Initialize the processor
+ ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+ InitializationInput initializationInput =
+ new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+ processor.initialize(initializationInput);
+
+ // Call checkpoint. This checkpoint could have originally headed to the processor instance for the same shard but
+ // due to reassignment a new processor instance is created.
+ processor.checkpoint("1234567");
+
+
+ // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener.
+ shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+ // Verify that the processor is shutdown.
+ Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+ }
+
+ @Test
+ public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException,
+ InvalidStateException, NoSuchFieldException,
+ IllegalAccessException {
+ testShutdownDuringReshardHelper(5);
+ }
+
+ @Test
+ public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException,
+ InvalidStateException, NoSuchFieldException,
+ IllegalAccessException {
+ testShutdownDuringReshardHelper(0);
+ }
+
+ private void testShutdownDuringReshardHelper(int numRecords)
+ throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException,
+ IllegalAccessException {
+ String system = "kinesis";
+ String stream = "stream";
+ final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+ final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0);
+
+ KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+ @Override
+ public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+ receivedRecordsLatch.countDown();
+ }
+
+ @Override
+ public void onShutdown(SystemStreamPartition ssp) {
+ receivedShutdownLatch.countDown();
+ }
+ };
+
+ KinesisRecordProcessor processor =
+ new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+ // Initialize the processor
+ ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+ InitializationInput initializationInput =
+ new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+ processor.initialize(initializationInput);
+
+ // Call processRecords on the processor
+ List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor);
+
+ // Verification steps
+
+ // Verify there is a receivedRecords call to listener.
+ Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+
+ // Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the
+ // listener until checkpoint is called for the last record consumed from shard.
+ new Thread(() -> shutDownProcessor(processor, ShutdownReason.TERMINATE)).start();
+
+ // If there are no records, the processor should shutdown immediately.
+ if (numRecords == 0) {
+ Assert.assertTrue("Unable to shutdown processor.",
+ receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+ return;
+ }
+
+ Assert.assertFalse("Processor shutdown too early.",
+ receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+
+ // Call checkpoint for the last but one record and the processor should still not call shutdown on listener.
+ processor.checkpoint(records.get(records.size() - 2).getSequenceNumber());
+ Assert.assertFalse("Processor shutdown too early.",
+ receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+
+ // Call checkpoint for the last record and the parent partition should be removed from mapper.
+ processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+ Assert.assertTrue("Unable to shutdown processor.",
+ receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+ }
+
+ static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
+ List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
+ Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
+ processors.forEach(processor -> {
+ try {
+ // Create records and call process records
+ IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
+ doNothing().when(checkpointer).checkpoint(anyString());
+ doNothing().when(checkpointer).checkpoint();
+ ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
+ when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
+ when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
+ List<Record> inputRecords = createRecords(numRecordsPerShard);
+ processorRecordMap.put(processor, inputRecords);
+ when(processRecordsInput.getRecords()).thenReturn(inputRecords);
+ processor.processRecords(processRecordsInput);
+ } catch (ShutdownException | InvalidStateException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ return processorRecordMap;
+ }
+
+ static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) {
+ try {
+ ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
+ when(shutdownInput.getShutdownReason()).thenReturn(reason);
+ when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
+ processor.shutdown(shutdownInput);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ static IRecordProcessorCheckpointer getCheckpointer(KinesisRecordProcessor processor)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = processor.getClass().getDeclaredField("checkpointer");
+ f.setAccessible(true);
+ return (IRecordProcessorCheckpointer) f.get(processor);
+ }
+
+ private static List<Record> createRecords(int numRecords) {
+ List<Record> records = new ArrayList<>(numRecords);
+ Random rand = new Random();
+
+ for (int i = 0; i < numRecords; i++) {
+ String dataStr = "testData-" + System.currentTimeMillis();
+ ByteBuffer data = ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8));
+ String key = String.format("partitionKey-%d", rand.nextLong());
+ String seqNum = String.format("%04d", 5 * i + 1);
+ Record record = new Record()
+ .withData(data)
+ .withPartitionKey(key)
+ .withSequenceNumber(seqNum)
+ .withApproximateArrivalTimestamp(new Date());
+ records.add(record);
+ }
+ return records;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
new file mode 100644
index 0000000..ade02ac
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+
+import static org.apache.samza.system.kinesis.consumer.TestKinesisRecordProcessor.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * These class of tests test KinesisSystemConsumer and KinesisRecordProcessor together.
+ */
+public class TestKinesisSystemConsumer {
+ private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
+
+ @Test
+ public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException,
+ NoSuchFieldException, IllegalAccessException {
+ String system = "kinesis";
+ String stream = "stream";
+ int numShards = 2;
+ int numRecordsPerShard = 5;
+
+ testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+ }
+
+ @Test
+ public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException,
+ InvalidStateException, NoSuchFieldException,
+ IllegalAccessException {
+ String system = "kinesis";
+ String stream = "stream";
+ int numShards = 1;
+ int numRecordsPerShard = 0;
+
+ testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+ }
+
+ /**
+ * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
+ * 1. Creation of record processors.
+ * 2. Initialization of record processors.
+ * 3. Processing records via record processors.
+ * 4. Calling checkpoint on record processors.
+ * 5. Shutting down (due to re-assignment or lease expiration) record processors.
+ */
+ private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
+ throws InterruptedException, ShutdownException, InvalidStateException,
+ NoSuchFieldException, IllegalAccessException {
+
+ KinesisConfig kConfig = new KinesisConfig(new MapConfig());
+ // Create consumer
+ KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
+ initializeMetrics(consumer, stream);
+
+ List<SystemStreamPartition> ssps = new LinkedList<>();
+ IntStream.range(0, numShards)
+ .forEach(p -> {
+ SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
+ ssps.add(ssp);
+ });
+ ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
+
+ // Create Kinesis record processor factory
+ IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);
+
+ // Create and initialize Kinesis record processor
+ Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
+ List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());
+
+ // Generate records to Kinesis record processor
+ Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);
+
+ // Verification steps
+
+ // Read events from the BEM queue
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
+ readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
+ if (numRecordsPerShard > 0) {
+ Assert.assertEquals(messages.size(), numShards);
+ } else {
+ // No input records and hence no messages
+ Assert.assertEquals(messages.size(), 0);
+ return;
+ }
+
+ Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
+ ssps.forEach(ssp -> {
+ try {
+ KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
+
+ if (numRecordsPerShard > 0) {
+ // Verify that the read messages are received in order and are the same as input records
+ Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+ List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+ List<Record> inputRecords = inputRecordMap.get(processor);
+ verifyRecords(envelopes, inputRecords, processor.getShardId());
+
+ // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+ IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+ consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(getCheckpointer(processor)).checkpoint(argument.capture());
+ Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
+ }
+
+ // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
+ shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+ Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
+ Assert.assertTrue(isSspAvailable(consumer, ssp));
+ } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+
+ private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
+ Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
+ IntStream.range(0, numShards)
+ .forEach(p -> {
+ String shardId = String.format("shard-%05d", p);
+ // Create Kinesis processor
+ KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
+
+ // Initialize the shard
+ ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+ InitializationInput initializationInput =
+ new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
+ processor.initialize(initializationInput);
+ processorMap.put(shardId, processor);
+ });
+ return processorMap;
+ }
+
+ private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps,
+ KinesisSystemConsumer consumer, int numEvents) throws InterruptedException {
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>();
+ int totalEventsConsumed = 0;
+
+ while (totalEventsConsumed < numEvents) {
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
+ consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
+ receivedMessages.forEach((key, value) -> {
+ if (messages.containsKey(key)) {
+ messages.get(key).addAll(value);
+ } else {
+ messages.put(key, new ArrayList<>(value));
+ }
+ });
+ totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
+ }
+
+ if (totalEventsConsumed < numEvents) {
+ String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents);
+ throw new SamzaException(msg);
+ }
+ return messages;
+ }
+
+ private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
+ Iterator outputRecordsIter = outputRecords.iterator();
+ inputRecords.forEach(record -> {
+ IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
+ String outputKey = (String) envelope.getKey();
+ KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+ Assert.assertEquals(outputKey, record.getPartitionKey());
+ Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
+ Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
+ record.getApproximateArrivalTimestamp());
+ Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
+ ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
+ record.getData().rewind();
+ Assert.assertTrue(outputData.equals(record.getData()));
+ verifyOffset(envelope.getOffset(), record, shardId);
+ });
+ }
+
+ private void verifyOffset(String offset, Record inputRecord, String shardId) {
+ KinesisSystemConsumerOffset ckpt = KinesisSystemConsumerOffset.parse(offset);
+ Assert.assertEquals(ckpt.getSeqNumber(), inputRecord.getSequenceNumber());
+ Assert.assertEquals(ckpt.getShardId(), shardId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initializeMetrics(KinesisSystemConsumer consumer, String stream)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = consumer.getClass().getDeclaredField("metrics");
+ f.setAccessible(true);
+ KinesisSystemConsumerMetrics metrics = (KinesisSystemConsumerMetrics) f.get(consumer);
+ metrics.initializeMetrics(Collections.singleton(stream));
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<SystemStreamPartition, KinesisRecordProcessor> getProcessorMap(KinesisSystemConsumer consumer)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = consumer.getClass().getDeclaredField("processors");
+ f.setAccessible(true);
+ return (Map<SystemStreamPartition, KinesisRecordProcessor>) f.get(consumer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean isSspAvailable(KinesisSystemConsumer consumer, SystemStreamPartition ssp)
+ throws NoSuchFieldException, IllegalAccessException {
+ SSPAllocator sspAllocator = getSspAllocator(consumer);
+ Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+ f.setAccessible(true);
+ Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
+ sspAllocator);
+ return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
+ }
+
+ private SSPAllocator getSspAllocator(KinesisSystemConsumer consumer)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f = consumer.getClass().getDeclaredField("sspAllocator");
+ f.setAccessible(true);
+ return (SSPAllocator) f.get(consumer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..615a06e
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisSystemConsumerOffset {
+ @Test
+ public void testEquality() {
+ KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456");
+ KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt.toString());
+ Assert.assertEquals(inCkpt, outCkpt);
+ }
+
+ @Test
+ public void testInEquality() {
+ KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456");
+
+ // With different shardId
+ KinesisSystemConsumerOffset inCkpt1 = new KinesisSystemConsumerOffset("shard-00001", "123456");
+ KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+ Assert.assertTrue(!inCkpt.equals(outCkpt));
+
+ // With different seqNumber
+ inCkpt1 = new KinesisSystemConsumerOffset("shard-00000", "123457");
+ outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+ Assert.assertTrue(!inCkpt.equals(outCkpt));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
new file mode 100644
index 0000000..0533a29
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSSPAllocator {
+ @Test
+ public void testAllocateAndFree() throws NoAvailablePartitionException, NoSuchFieldException, IllegalAccessException {
+ int numPartitions = 2;
+ String system = "kinesis";
+ String stream = "stream";
+ List<SystemStreamPartition> ssps = new ArrayList<>();
+ IntStream.range(0, numPartitions)
+ .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+ SSPAllocator allocator = new SSPAllocator();
+ ssps.forEach(allocator::free);
+
+ Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+ Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+ SystemStreamPartition ssp = allocator.allocate(stream);
+ Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+ Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+ Assert.assertEquals(ssp, ssps.get(0));
+
+ ssp = allocator.allocate(stream);
+ Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+ Assert.assertFalse(isSspAvailable(allocator, ssps.get(1)));
+ Assert.assertEquals(ssp, ssps.get(1));
+
+ allocator.free(ssps.get(1));
+ Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+ Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+ allocator.free(ssps.get(0));
+ Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+ Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+ }
+
+ @Test (expected = NoAvailablePartitionException.class)
+ public void testAssignMoreThanMaxPartitions() throws NoAvailablePartitionException {
+ int numPartitions = 2;
+ String system = "kinesis";
+ String stream = "stream";
+ List<SystemStreamPartition> ssps = new ArrayList<>();
+ IntStream.range(0, numPartitions)
+ .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+ SSPAllocator allocator = new SSPAllocator();
+ ssps.forEach(allocator::free);
+
+ allocator.allocate(stream);
+ allocator.allocate(stream);
+ allocator.allocate(stream); // An exception should be thrown at this point.
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testFreeSameSspTwice() throws NoAvailablePartitionException {
+ int numPartitions = 2;
+ String system = "kinesis";
+ String stream = "stream";
+ List<SystemStreamPartition> ssps = new ArrayList<>();
+ IntStream.range(0, numPartitions)
+ .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+ SSPAllocator allocator = new SSPAllocator();
+ ssps.forEach(allocator::free);
+
+ SystemStreamPartition ssp = allocator.allocate(stream);
+ allocator.free(ssp);
+ allocator.free(ssp); // An exception should be thrown at this point.
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testFreeUnallocatedSsp() throws NoAvailablePartitionException {
+ int numPartitions = 2;
+ String system = "kinesis";
+ String stream = "stream";
+ List<SystemStreamPartition> ssps = new ArrayList<>();
+ IntStream.range(0, numPartitions)
+ .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+ SSPAllocator allocator = new SSPAllocator();
+ ssps.forEach(allocator::free);
+
+ allocator.allocate(stream);
+ allocator.free(ssps.get(1)); // An exception should be thrown at this point.
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean isSspAvailable(SSPAllocator sspAllocator, SystemStreamPartition ssp) throws NoSuchFieldException, IllegalAccessException {
+ Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+ f.setAccessible(true);
+ Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
+ sspAllocator);
+ return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 0fe3dfa..e50e816 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -24,9 +24,8 @@ include \
'samza-rest',
'samza-shell',
'samza-azure',
- 'samza-sql'
-
-
+ 'samza-sql',
+ 'samza-aws'
def scalaModules = [
'samza-core',
[2/2] samza git commit: SAMZA-1515; Implement a consumer for Kinesis
Posted by ja...@apache.org.
SAMZA-1515; Implement a consumer for Kinesis
Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>
Reviewers: Jagadish<ja...@apache.org>
Closes #368 from atoomula/kinesis
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9961023f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9961023f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9961023f
Branch: refs/heads/master
Commit: 9961023f7bf7c4b19804fb4e50a14c86d6fc9233
Parents: 5e68d62
Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>
Authored: Tue Nov 28 13:12:10 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 28 13:12:10 2017 -0800
----------------------------------------------------------------------
build.gradle | 33 ++
.../kinesis/KinesisAWSCredentialsProvider.java | 69 +++++
.../samza/system/kinesis/KinesisConfig.java | 287 ++++++++++++++++++
.../system/kinesis/KinesisSystemAdmin.java | 124 ++++++++
.../system/kinesis/KinesisSystemFactory.java | 87 ++++++
.../KinesisIncomingMessageEnvelope.java | 62 ++++
.../consumer/KinesisRecordProcessor.java | 208 +++++++++++++
.../KinesisRecordProcessorListener.java | 51 ++++
.../kinesis/consumer/KinesisSystemConsumer.java | 256 ++++++++++++++++
.../consumer/KinesisSystemConsumerOffset.java | 107 +++++++
.../consumer/NoAvailablePartitionException.java | 38 +++
.../system/kinesis/consumer/SSPAllocator.java | 73 +++++
.../metrics/KinesisSystemConsumerMetrics.java | 106 +++++++
.../system/kinesis/metrics/SamzaHistogram.java | 63 ++++
.../TestKinesisAWSCredentialsProvider.java | 60 ++++
.../samza/system/kinesis/TestKinesisConfig.java | 132 ++++++++
.../kinesis/TestKinesisSystemFactory.java | 115 +++++++
.../consumer/TestKinesisRecordProcessor.java | 301 +++++++++++++++++++
.../consumer/TestKinesisSystemConsumer.java | 270 +++++++++++++++++
.../TestKinesisSystemConsumerOffset.java | 48 +++
.../kinesis/consumer/TestSSPAllocator.java | 127 ++++++++
settings.gradle | 5 +-
22 files changed, 2619 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 59ff5f2..eddb11c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,6 +220,39 @@ project(':samza-azure') {
}
}
+project(':samza-aws') {
+ apply plugin: 'java'
+ apply plugin: 'checkstyle'
+
+ dependencies {
+ compile "com.amazonaws:aws-java-sdk-kinesis:1.11.152"
+ compile "com.amazonaws:amazon-kinesis-client:1.7.5"
+ compile "com.amazonaws:amazon-kinesis-producer:0.10.0"
+ compile "io.dropwizard.metrics:metrics-core:3.1.2"
+ compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
+ compile "org.codehaus.jackson:jackson-mapper-asl:1.9.7"
+ compile project(':samza-api')
+ compile project(":samza-core_$scalaVersion")
+ compile "org.slf4j:slf4j-api:$slf4jVersion"
+ runtime "org.apache.httpcomponents:httpclient:4.5.2"
+ runtime "org.apache.httpcomponents:httpcore:4.4.5"
+ testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
+ }
+
+ repositories {
+ maven {
+ url "https://repo1.maven.org/maven2/"
+ }
+ }
+
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ toolVersion = "$checkstyleVersion"
+ }
+}
+
+
project(":samza-autoscaling_$scalaVersion") {
apply plugin: 'scala'
apply plugin: 'checkstyle'
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..a37cfb4
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+
+
+/**
+ * AWSCredentialsProvider implementation that takes in accessKey and secretKey directly. Requires both accessKey and
+ * secretKey to be non-null for it to create a BasicAWSCredentials instance. Otherwise, it creates an AWSCredentials
+ * instance with null keys.
+ */
+public class KinesisAWSCredentialsProvider implements AWSCredentialsProvider {
+ private final AWSCredentials creds;
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisAWSCredentialsProvider.class.getName());
+
+ public KinesisAWSCredentialsProvider(String accessKey, String secretKey) {
+ if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)) {
+ creds = new AWSCredentials() {
+ @Override
+ public String getAWSAccessKeyId() {
+ return null;
+ }
+
+ @Override
+ public String getAWSSecretKey() {
+ return null;
+ }
+ };
+ LOG.info("Could not load credentials from KinesisAWSCredentialsProvider");
+ } else {
+ creds = new BasicAWSCredentials(accessKey, secretKey);
+ LOG.info("Loaded credentials from KinesisAWSCredentialsProvider");
+ }
+ }
+
+ @Override
+ public AWSCredentials getCredentials() {
+ return creds;
+ }
+
+ @Override
+ public void refresh() {
+ //no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..a4ac40d
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.ClientConfiguration;
+
+
+/**
+ * Configs for Kinesis system. It contains three sets of configs:
+ * <ol>
+ * <li> Configs required by Samza Kinesis Consumer.
+ * <li> Configs that are AWS client specific provided at system scope {@link ClientConfiguration}
+ * <li> Configs that are KCL specific (could be provided either at system scope or stream scope)
+ * {@link KinesisClientLibConfiguration}
+ * </ol>
+ */
+public class KinesisConfig extends MapConfig {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
+
+ private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
+ private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
+
+ private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
+ private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
+
+ private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
+ private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
+ private static final String DEFAULT_CONFIG_PROXY_HOST = "";
+ private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
+ private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
+
+ private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
+ private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
+
+ public KinesisConfig(Config config) {
+ super(config);
+ }
+
+ /**
+ * Return a set of streams from the config for a given system.
+ * @param system name of the system
+ * @return a set of streams
+ */
+ public Set<String> getKinesisStreams(String system) {
+ // build stream-level configs
+ Config streamsConfig = subset(String.format("systems.%s.streams.", system), true);
+ // all properties should now start with stream name
+ Set<String> streams = new HashSet<>();
+ streamsConfig.keySet().forEach(key -> {
+ String[] parts = key.split("\\.", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Ill-formatted stream config: " + key);
+ }
+ streams.add(parts[0]);
+ });
+ return streams;
+ }
+
+ /**
+ * Get KCL config for a given system stream.
+ * @param system name of the system
+ * @param stream name of the stream
+ * @param appName name of the application
+ * @return Stream scoped KCL configs required to build
+ * {@link KinesisClientLibConfiguration}
+ */
+ public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) {
+ ClientConfiguration clientConfig = getAWSClientConfig(system);
+ String workerId = appName + "-" + UUID.randomUUID();
+ InitialPositionInStream startPos = InitialPositionInStream.LATEST;
+ AWSCredentialsProvider provider = credentialsProviderForStream(system, stream);
+ KinesisClientLibConfiguration kinesisClientLibConfiguration =
+ new KinesisClientLibConfiguration(appName, stream, provider, workerId)
+ .withRegionName(getRegion(system, stream).getName())
+ .withKinesisClientConfig(clientConfig)
+ .withCloudWatchClientConfig(clientConfig)
+ .withDynamoDBClientConfig(clientConfig)
+ .withInitialPositionInStream(startPos)
+ .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics.
+ // First, get system scoped configs for KCL and override with configs set at stream scope.
+ setKinesisClientLibConfigs(
+ subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration);
+ setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)),
+ kinesisClientLibConfiguration);
+ return kinesisClientLibConfiguration;
+ }
+
+ /**
+ * Get the Kinesis secret key for the system stream
+ * @param system name of the system
+ * @param stream name of the stream
+ * @return Kinesis secret key
+ */
+ protected String getStreamSecretKey(String system, String stream) {
+ return get(String.format(CONFIG_STREAM_SECRET_KEY, system, stream));
+ }
+
+ /**
+ * Get SSL socket factory for the proxy for a given system
+ * @param system name of the system
+ * @return ConnectionSocketFactory
+ */
+ protected ConnectionSocketFactory getSSLSocketFactory(String system) {
+ return null;
+ }
+
+ /**
+ * @param system name of the system
+ * @return {@link ClientConfiguration} which has options controlling how the client connects to kinesis
+ * (eg: proxy settings, retry counts, etc)
+ */
+ ClientConfiguration getAWSClientConfig(String system) {
+ ClientConfiguration awsClientConfig = new ClientConfiguration();
+ setAwsClientConfigs(subset(String.format(CONFIG_AWS_CLIENT_CONFIG, system)), awsClientConfig);
+ awsClientConfig.getApacheHttpClientConfig().setSslSocketFactory(getSSLSocketFactory(system));
+ return awsClientConfig;
+ }
+
+ /**
+ * Get the proxy host as a system level config. This is needed when
+ * users need to go through a proxy for the Kinesis connections.
+ * @param system name of the system
+ * @return proxy host name or empty string if not defined
+ */
+ String getProxyHost(String system) {
+ return get(String.format(CONFIG_PROXY_HOST, system), DEFAULT_CONFIG_PROXY_HOST);
+ }
+
+ /**
+ * Get the proxy port number as a system level config. This is needed when
+ * users need to go through a proxy for the Kinesis connections.
+ * @param system name of the system
+ * @return proxy port number or 0 if not defined
+ */
+ int getProxyPort(String system) {
+ return getInt(String.format(CONFIG_PROXY_PORT, system), DEFAULT_CONFIG_PROXY_PORT);
+ }
+
+ /**
+ * Get the Kinesis region for the system stream
+ * @param system name of the system
+ * @param stream name of the stream
+ * @return Kinesis region
+ */
+ Region getRegion(String system, String stream) {
+ String name = get(String.format(CONFIG_STREAM_REGION, system, stream),
+ get(String.format(CONFIG_SYSTEM_REGION, system)));
+ return Region.getRegion(Regions.fromName(name));
+ }
+
+ /**
+ * Get the Kinesis access key name for the system stream
+ * @param system name of the system
+ * @param stream name of the stream
+ * @return Kinesis access key
+ */
+ String getStreamAccessKey(String system, String stream) {
+ return get(String.format(CONFIG_STREAM_ACCESS_KEY, system, stream));
+ }
+
+ /**
+ * Get the appropriate CredentialProvider for a given system stream.
+ * @param system name of the system
+ * @param stream name of the stream
+ * @return AWSCredentialsProvider
+ */
+ AWSCredentialsProvider credentialsProviderForStream(String system, String stream) {
+ // Try to load credentials in the following order:
+ // 1. Access key from the config and passed in secretKey
+ // 2. From the default credential provider chain (environment variables, system properties, AWS profile file, etc)
+ return new AWSCredentialsProviderChain(
+ new KinesisAWSCredentialsProvider(getStreamAccessKey(system, stream), getStreamSecretKey(system, stream)),
+ new DefaultAWSCredentialsProviderChain());
+ }
+
+ private void setAwsClientConfigs(Config config, ClientConfiguration clientConfig) {
+ for (Entry<String, String> entry : config.entrySet()) {
+ boolean found = false;
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (StringUtils.isEmpty(value)) {
+ continue;
+ }
+ for (Method method : ClientConfiguration.class.getMethods()) {
+ // For each property invoke the corresponding setter, if it exists
+ if (method.getName().equals("set" + key)) {
+ found = true;
+ Class<?> type = method.getParameterTypes()[0];
+ try {
+ if (type == long.class) {
+ method.invoke(clientConfig, Long.valueOf(value));
+ } else if (type == int.class) {
+ method.invoke(clientConfig, Integer.valueOf(value));
+ } else if (type == boolean.class) {
+ method.invoke(clientConfig, Boolean.valueOf(value));
+ } else if (type == String.class) {
+ method.invoke(clientConfig, value);
+ }
+ LOG.info("Loaded property " + key + " = " + value);
+ break;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Error trying to set field %s with the value '%s'", key, value), e);
+ }
+ }
+ }
+ if (!found) {
+ LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+ }
+ }
+ }
+
+ private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
+ for (Entry<String, String> entry : config.entrySet()) {
+ boolean found = false;
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (StringUtils.isEmpty(value)) {
+ continue;
+ }
+ for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
+ if (method.getName().equals("with" + key)) {
+ found = true;
+ Class<?> type = method.getParameterTypes()[0];
+ try {
+ if (type == long.class) {
+ method.invoke(kinesisLibConfig, Long.valueOf(value));
+ } else if (type == int.class) {
+ method.invoke(kinesisLibConfig, Integer.valueOf(value));
+ } else if (type == boolean.class) {
+ method.invoke(kinesisLibConfig, Boolean.valueOf(value));
+ } else if (type == String.class) {
+ method.invoke(kinesisLibConfig, value);
+ } else if (type == InitialPositionInStream.class) {
+ method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
+ }
+ LOG.info("Loaded property " + key + " = " + value);
+ break;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Error trying to set field %s with the value '%s'", key, value), e);
+ }
+ }
+ }
+ if (!found) {
+ LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
new file mode 100644
index 0000000..4843276
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+
+
+/**
+ * A Kinesis-based implementation of SystemAdmin.
+ */
+public class KinesisSystemAdmin implements SystemAdmin {
+
+ private static final SystemStreamMetadata.SystemStreamPartitionMetadata SYSTEM_STREAM_PARTITION_METADATA =
+ new SystemStreamMetadata.SystemStreamPartitionMetadata(ExtendedSequenceNumber.TRIM_HORIZON.getSequenceNumber(),
+ ExtendedSequenceNumber.LATEST.getSequenceNumber(),
+ ExtendedSequenceNumber.LATEST.getSequenceNumber());
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemAdmin.class.getName());
+
+ private final String system;
+ private final KinesisConfig kConfig;
+
+ public KinesisSystemAdmin(String system, KinesisConfig kConfig) {
+ this.system = system;
+ this.kConfig = kConfig;
+ }
+
+ /**
+ * Source of truth for checkpointing is always kinesis and the offsets written to samza checkpoint topic are ignored.
+ * Hence, return null for the getOffsetsAfter for a supplied map of ssps.
+ */
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ Map<SystemStreamPartition, String> offsetsAfter = new HashMap<>();
+
+ for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+ offsetsAfter.put(systemStreamPartition, null);
+ }
+
+ return offsetsAfter;
+ }
+
+ /**
+ * Source of truth for checkpointing is always kinesis and the offsets given by samza are always ignored by KCL.
+ * Hence, return a placeholder for each ssp.
+ */
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ return streamNames.stream().collect(Collectors.toMap(Function.identity(), this::createSystemStreamMetadata));
+ }
+
+ private SystemStreamMetadata createSystemStreamMetadata(String stream) {
+ LOG.info("create stream metadata for stream {} based on aws stream", stream);
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadata = new HashMap<>();
+ AmazonKinesisClient client = null;
+
+ try {
+ ClientConfiguration clientConfig = kConfig.getAWSClientConfig(system);
+ AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+ .withCredentials(kConfig.credentialsProviderForStream(system, stream))
+ .withClientConfiguration(clientConfig);
+ builder.setRegion(kConfig.getRegion(system, stream).getName());
+ client = (AmazonKinesisClient) builder.build();
+ StreamDescription desc = client.describeStream(stream).getStreamDescription();
+ IntStream.range(0, desc.getShards().size())
+ .forEach(i -> metadata.put(new Partition(i), SYSTEM_STREAM_PARTITION_METADATA));
+ } catch (Exception e) {
+ String errMsg = "couldn't load metadata for stream " + stream;
+ LOG.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+ }
+
+ return new SystemStreamMetadata(stream, metadata);
+ }
+
+ /**
+ * Checkpoints are written to KCL and is always the source of truth. Format for Samza offsets is different from
+ * that of Kinesis checkpoint. Samza offsets are not comparable. Hence, return null.
+ */
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
new file mode 100644
index 0000000..558e871
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+
+import org.apache.samza.system.kinesis.consumer.KinesisSystemConsumer;
+
+
+/**
+ * A Kinesis-based implementation of SystemFactory.
+ */
+public class KinesisSystemFactory implements SystemFactory {
+ @Override
+ public SystemConsumer getConsumer(String system, Config config, MetricsRegistry registry) {
+ KinesisConfig kConfig = new KinesisConfig(config);
+ return new KinesisSystemConsumer(system, kConfig, registry);
+ }
+
+ @Override
+ public SystemProducer getProducer(String system, Config config, MetricsRegistry registry) {
+ return null;
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String system, Config config) {
+ validateConfig(system, config);
+ KinesisConfig kConfig = new KinesisConfig(config);
+ return new KinesisSystemAdmin(system, kConfig);
+ }
+
+ protected void validateConfig(String system, Config config) {
+ // Kinesis system does not support groupers other than AllSspToSingleTaskGrouper
+ JobConfig jobConfig = new JobConfig(config);
+ if (!jobConfig.getSystemStreamPartitionGrouperFactory().equals(
+ AllSspToSingleTaskGrouperFactory.class.getCanonicalName())) {
+ String errMsg = String.format("Incorrect Grouper %s used for KinesisSystemConsumer %s. Please set the %s config"
+ + " to %s.", jobConfig.getSystemStreamPartitionGrouperFactory(), system,
+ JobConfig.SSP_GROUPER_FACTORY(), AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+ throw new ConfigException(errMsg);
+ }
+
+ // Kinesis streams cannot be configured as broadcast streams
+ TaskConfigJava taskConfig = new TaskConfigJava(config);
+ if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss -> system.equals(ss.getSystem()))) {
+ throw new ConfigException("Kinesis streams cannot be configured as broadcast streams.");
+ }
+
+ // Kinesis streams cannot be configured as bootstrap streams
+ KinesisConfig kConfig = new KinesisConfig(config);
+ kConfig.getKinesisStreams(system).forEach(stream -> {
+ StreamConfig streamConfig = new StreamConfig(kConfig);
+ SystemStream ss = new SystemStream(system, stream);
+ if (streamConfig.getBootstrapEnabled(ss)) {
+ throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
new file mode 100644
index 0000000..95e6b6a
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.Date;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Kinesis record with payload and some metadata.
+ */
+public class KinesisIncomingMessageEnvelope extends IncomingMessageEnvelope {
+ private final String shardId;
+ private final String sequenceNumber;
+ private final Date approximateArrivalTimestamp;
+
+ public KinesisIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
+ Object message, String shardId, String sequenceNumber, Date approximateArrivalTimestamp) {
+ super(systemStreamPartition, offset, key, message);
+ this.shardId = shardId;
+ this.sequenceNumber = sequenceNumber;
+ this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public Date getApproximateArrivalTimestamp() {
+ return approximateArrivalTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "KinesisIncomingMessageEnvelope:: shardId:" + shardId + ", sequenceNumber:" + sequenceNumber
+ + ", approximateArrivalTimestamp:" + approximateArrivalTimestamp + ", message:" + getMessage();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
new file mode 100644
index 0000000..53ff27f
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Record processor for AWS kinesis stream. It does the following:
+ * <ul>
+ * <li> when a shard is assigned by KCL in initialize API, it asks and gets an ssp from sspAllocator.
+ * <li> when records are received in processRecords API, it translates them to IncomingMessageEnvelope and enqueues
+ * the resulting envelope in the appropriate blocking buffer queue.
+ * <li> when checkpoint API is called by samza, it checkpoints via KCL to Kinesis.
+ * <li> when shutdown API is called by KCL, based on the terminate reason, it takes necessary action.
+ * </ul>
+ *
+ * initialize, processRecords and shutdown APIs are never called concurrently on a processor instance. However,
+ * checkpoint API could be called by Samza thread while processRecords and shutdown callback APIs are invoked by KCL.
+ * Please note that the APIs for different record processor instances could be called concurrently.
+ */
+
+public class KinesisRecordProcessor implements IRecordProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class.getName());
+ static final long POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS = 1000;
+
+ private final SystemStreamPartition ssp;
+
+ private String shardId;
+ private KinesisRecordProcessorListener listener;
+ private IRecordProcessorCheckpointer checkpointer;
+ private ExtendedSequenceNumber initSeqNumber;
+
+ private volatile ExtendedSequenceNumber lastProcessedRecordSeqNumber;
+ private volatile ExtendedSequenceNumber lastCheckpointedRecordSeqNumber;
+
+ private boolean shutdownRequested = false;
+
+ KinesisRecordProcessor(SystemStreamPartition ssp, KinesisRecordProcessorListener listener) {
+ this.ssp = ssp;
+ this.listener = listener;
+ }
+
+ /**
+ * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
+ * (via processRecords).
+ *
+ * @param initializationInput Provides information related to initialization
+ */
+ @Override
+ public void initialize(InitializationInput initializationInput) {
+ Validate.isTrue(listener != null, "There is no listener set for the processor.");
+ initSeqNumber = initializationInput.getExtendedSequenceNumber();
+ shardId = initializationInput.getShardId();
+ LOG.info("Initialization done for {} with sequence {}", this,
+ initializationInput.getExtendedSequenceNumber().getSequenceNumber());
+ }
+
+ /**
+ * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
+ * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint
+ * position for each partition key.
+ *
+ * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
+ * to them (eg checkpointing).
+ */
+ @Override
+ public void processRecords(ProcessRecordsInput processRecordsInput) {
+ // KCL does not send any records to the processor that was shutdown.
+ Validate.isTrue(!shutdownRequested,
+ String.format("KCL returned records after shutdown is called on the processor %s.", this));
+ // KCL aways gives reference to the same checkpointer instance for a given processor instance.
+ checkpointer = processRecordsInput.getCheckpointer();
+ List<Record> records = processRecordsInput.getRecords();
+ // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true.
+ if (!records.isEmpty()) {
+ lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
+ listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest());
+ }
+ }
+
+ /**
+ * Invoked by the Samza thread to commit checkpoint for the shard owned by the record processor instance.
+ *
+ * @param seqNumber sequenceNumber to checkpoint for the shard owned by this processor instance.
+ */
+ public void checkpoint(String seqNumber) {
+ ExtendedSequenceNumber seqNumberToCheckpoint = new ExtendedSequenceNumber(seqNumber);
+ if (initSeqNumber.compareTo(seqNumberToCheckpoint) > 0) {
+ LOG.warn("Samza called checkpoint with seqNumber {} smaller than initial seqNumber {} for {}. Ignoring it!",
+ seqNumber, initSeqNumber, this);
+ return;
+ }
+
+ if (checkpointer == null) {
+ // checkpointer could be null as a result of shard re-assignment before the first record is processed.
+ LOG.warn("Ignoring checkpointing for {} with seqNumber {} because of re-assignment.", this, seqNumber);
+ return;
+ }
+
+ try {
+ checkpointer.checkpoint(seqNumber);
+ lastCheckpointedRecordSeqNumber = seqNumberToCheckpoint;
+ } catch (ShutdownException e) {
+ // This can happen as a result of shard re-assignment.
+ String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Dropping the checkpoint.",
+ this, seqNumber);
+ LOG.warn(msg, e);
+ } catch (InvalidStateException e) {
+ // This can happen when KCL encounters issues with internal state, eg: dynamoDB table is not found
+ String msg =
+ String.format("Checkpointing %s with seqNumber %s failed with exception.", this, seqNumber);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ } catch (ThrottlingException e) {
+ // Throttling is handled by KCL via the client lib configuration properties. If we get an exception inspite of
+ // throttling back-off behavior, let's throw an exception as the configs
+ String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Checkpoint interval is"
+ + " too aggressive for the provisioned throughput of the dynamoDB table where the checkpoints are stored."
+ + " Either reduce the checkpoint interval -or- increase the throughput of dynamoDB table.", this,
+ seqNumber);
+ throw new SamzaException(msg);
+ }
+ }
+
+ /**
+ * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
+ * RecordProcessor instance.
+ *
+ * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
+ * processor.
+ */
+ @Override
+ public void shutdown(ShutdownInput shutdownInput) {
+ LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());
+
+ Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
+ shutdownRequested = true;
+ // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
+ // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
+ // progress.
+ if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
+ // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
+ // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
+ // to consume from the child shard(s).
+ try {
+ LOG.info("Waiting for all the records for {} to be processed.", this);
+ // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
+ // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
+ // be null.
+ while (lastProcessedRecordSeqNumber != null
+ && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
+ Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
+ }
+ LOG.info("Final checkpoint for {} before shutting down.", this);
+ shutdownInput.getCheckpointer().checkpoint();
+ } catch (Exception e) {
+ LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
+ }
+ }
+ listener.onShutdown(ssp);
+ }
+
+ String getShardId() {
+ return shardId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("KinesisRecordProcessor: ssp %s shard %s hashCode %s", ssp, shardId, hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
new file mode 100644
index 0000000..72d86b9
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Listener interface implemented by consumer to be notified when {@link KinesisRecordProcessor} receives records and
+ * is ready to shutdown.
+ */
+public interface KinesisRecordProcessorListener {
+ /**
+ * Method invoked by
+ * {@link KinesisRecordProcessor#processRecords(com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput)}
+ * when the records are received by the processor.
+ * @param ssp Samza partition for which the records belong to
+ * @param records List of kinesis records
+ * @param millisBehindLatest Time lag of the batch of records with respect to the tip of the stream
+ */
+ void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest);
+
+ /**
+ * Method invoked by
+ * {@link KinesisRecordProcessor#shutdown(com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput)}
+ * when the processor is ready to shutdown.
+ * @param ssp Samza partition for which the shutdown is invoked
+ */
+ void onShutdown(SystemStreamPartition ssp);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
new file mode 100644
index 0000000..6afffd3
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointListener;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+import org.apache.samza.util.BlockingEnvelopeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import com.amazonaws.services.kinesis.model.Record;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * The system consumer for Kinesis, extending the {@link BlockingEnvelopeMap}.
+ *
+ * The system consumer creates a KinesisWorker per stream in it's own thread by providing a RecordProcessorFactory.
+ * Kinesis Client Library (KCL) uses this factory to instantiate a KinesisRecordProcessor for each shard in the Kinesis
+ * stream. KCL pushes data records to the appropriate record processor and the processor is responsible for processing
+ * the resulting records and place them into a blocking queue in {@link BlockingEnvelopeMap}.
+ *
+ * <pre>
+ * {@code
+ * Shard1 +----------------------+
+ * . --------------------> |KinesisRecordProcessor|
+ * Stream1 | Shard2 +----------------------+
+ * +-------------+ +-----------------------------+ +----------------------+
+ * .--------------->| Worker |---->| RecordProcessorFactory | ----> |KinesisRecordProcessor|
+ * | +-------------+ +-------------+---------------+ +----------------------+
+ * | | Shard3 +----------------------+
+ * | . --------------------> |KinesisRecordProcessor|
+ * | +----------------------+
+ * | Stream2
+ * +---------------------+ +-------------+ +-----------------------------+ +-------+
+ * |KinesisSystemConsumer|---->| Worker |---->| RecordProcessorFactory |------->| ... |
+ * +---------------------+ +-------------+ +-----------------------------+ +-------+
+ * |
+ * |
+ * |
+ * |
+ * | +-----------+
+ * . -------------->| ... |
+ * +-----------+
+ * }
+ * </pre>
+ * Since KinesisSystemConsumer uses KCL, the checkpoint state is stored in a dynamoDB table which is maintained by KCL.
+ * KinesisSystemConsumer implements CheckpointListener to commit checkpoints via KCL.
+ */
+
+public class KinesisSystemConsumer extends BlockingEnvelopeMap implements CheckpointListener, KinesisRecordProcessorListener {
+
+ private static final int MAX_BLOCKING_QUEUE_SIZE = 100;
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemConsumer.class.getName());
+
+ private final String system;
+ private final KinesisConfig kConfig;
+ private final KinesisSystemConsumerMetrics metrics;
+ private final SSPAllocator sspAllocator;
+
+ private final Set<String> streams = new HashSet<>();
+ private final Map<SystemStreamPartition, KinesisRecordProcessor> processors = new ConcurrentHashMap<>();
+ private final List<Worker> workers = new LinkedList<>();
+
+ private ExecutorService executorService;
+
+ private volatile Exception callbackException;
+
+ public KinesisSystemConsumer(String systemName, KinesisConfig kConfig, MetricsRegistry registry) {
+ super(registry, System::currentTimeMillis, null);
+ this.system = systemName;
+ this.kConfig = kConfig;
+ this.metrics = new KinesisSystemConsumerMetrics(registry);
+ this.sspAllocator = new SSPAllocator();
+ }
+
+ @Override
+ protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+ return new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE);
+ }
+
+ @Override
+ protected void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+ try {
+ super.put(ssp, envelope);
+ } catch (Exception e) {
+ LOG.error("Exception while putting record. Shutting down SystemStream {}", ssp.getSystemStream(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void register(SystemStreamPartition ssp, String offset) {
+ LOG.info("Register called with ssp {} and offset {}. Offset will be ignored.", ssp, offset);
+ String stream = ssp.getStream();
+ streams.add(stream);
+ sspAllocator.free(ssp);
+ super.register(ssp, offset);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Start samza consumer for system {}.", system);
+
+ metrics.initializeMetrics(streams);
+
+ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("kinesis-worker-thread-" + system + "-%d")
+ .build();
+ // launch kinesis workers in separate threads, one per stream
+ executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory);
+
+ for (String stream : streams) {
+ // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the
+ // application name. Dynamodb table name must be unique for a given account and region (even across different
+ // streams). So, let's create the default one with the combination of job name, job id and stream name. The table
+ // name could be changed by providing a different TableName via KCL specific config.
+ String kinesisApplicationName =
+ kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream;
+
+ Worker worker = new Worker.Builder()
+ .recordProcessorFactory(createRecordProcessorFactory(stream))
+ .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName))
+ .build();
+
+ workers.add(worker);
+
+ // launch kinesis workers in separate thread-pools, one per stream
+ executorService.execute(worker);
+ LOG.info("Started worker for system {} stream {}.", system, stream);
+ }
+ }
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException {
+ if (callbackException != null) {
+ throw new SamzaException(callbackException);
+ }
+ return super.poll(ssps, timeout);
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stop samza consumer for system {}.", system);
+ workers.forEach(Worker::shutdown);
+ workers.clear();
+ executorService.shutdownNow();
+ LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system);
+ }
+
+ // package-private for tests
+ IRecordProcessorFactory createRecordProcessorFactory(String stream) {
+ return () -> {
+ // This code is executed in Kinesis thread context.
+ try {
+ SystemStreamPartition ssp = sspAllocator.allocate(stream);
+ KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
+ KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
+ Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
+ + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
+ return processor;
+ } catch (Exception e) {
+ callbackException = e;
+ // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
+ // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
+ // will be required at this point. After the job restart, the newly created shards will be discovered and enough
+ // ssps will be added to sspAllocator freePool.
+ throw new SamzaException(e);
+ }
+ };
+ }
+
+ @Override
+ public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
+ LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
+ sspOffsets.forEach((ssp, offset) -> {
+ KinesisRecordProcessor processor = processors.get(ssp);
+ KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
+ if (processor == null) {
+ LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
+ + " checkpoint {}.", ssp, offset);
+ } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
+ LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
+ + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
+ kinesisOffset.getShardId(), offset);
+ } else {
+ processor.checkpoint(kinesisOffset.getSeqNumber());
+ }
+ });
+ }
+
+ @Override
+ public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+ metrics.updateMillisBehindLatest(ssp.getStream(), millisBehindLatest);
+ records.forEach(record -> put(ssp, translate(ssp, record)));
+ }
+
+ @Override
+ public void onShutdown(SystemStreamPartition ssp) {
+ processors.remove(ssp);
+ sspAllocator.free(ssp);
+ }
+
+ private IncomingMessageEnvelope translate(SystemStreamPartition ssp, Record record) {
+ String shardId = processors.get(ssp).getShardId();
+ byte[] payload = new byte[record.getData().remaining()];
+
+ metrics.updateMetrics(ssp.getStream(), record);
+ record.getData().get(payload);
+ KinesisSystemConsumerOffset offset = new KinesisSystemConsumerOffset(shardId, record.getSequenceNumber());
+ return new KinesisIncomingMessageEnvelope(ssp, offset.toString(), record.getPartitionKey(),
+ payload, shardId, record.getSequenceNumber(), record.getApproximateArrivalTimestamp());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..13296ca
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Kinesis system consumer related checkpoint information that is stored in the IncomingMessageEnvelope offset.
+ *
+ * It contains the following metadata:
+ * <ul>
+ * <li> shardId: Kinesis stream shardId.
+ * <li> seqNumber: sequence number in the above shard.
+ * </ul>
+ *
+ * Please note that the source of truth for checkpointing is the AWS dynamoDB table corresponding to the application.
+ * The offset that is stored in Samza checkpoint topic is not used.
+ */
+public class KinesisSystemConsumerOffset {
+
+ @JsonProperty("shardId")
+ private String shardId;
+ @JsonProperty("seqNumber")
+ private String seqNumber;
+
+ @JsonCreator
+ KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
+ @JsonProperty("seqNumber") String seqNumber) {
+ this.shardId = shardId;
+ this.seqNumber = seqNumber;
+ }
+
+ String getShardId() {
+ return shardId;
+ }
+
+ String getSeqNumber() {
+ return seqNumber;
+ }
+
+ static KinesisSystemConsumerOffset parse(String metadata) {
+ JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+ byte[] bytes;
+ try {
+ bytes = metadata.getBytes("UTF-8");
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ return jsonSerde.fromBytes(bytes);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public String toString() {
+ JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+ return new String(jsonSerde.toBytes(this));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof KinesisSystemConsumerOffset)) {
+ return false;
+ }
+
+ String thatShardId = ((KinesisSystemConsumerOffset) o).getShardId();
+ if (!(shardId == null ? thatShardId == null : shardId.equals(thatShardId))) {
+ return false;
+ }
+ String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
+ if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = shardId.hashCode();
+ result = 31 * result + seqNumber.hashCode();
+ return result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
new file mode 100644
index 0000000..6caf760
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+
+/**
+ * SSPAllocator is unable to allocate an SSP
+ */
+public class NoAvailablePartitionException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoAvailablePartitionException(String message) {
+ super(message);
+ }
+
+ public NoAvailablePartitionException(String message, Exception e) {
+ super(message, e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
new file mode 100644
index 0000000..4b7cff8
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSPAllocator is responsible for assigning Samza SystemStreamPartitions (SSPs). It provides two APIs:
+ * <ul>
+ * <li> allocate: Given a stream, returns free ssp.
+ * <li> free: Adds ssp back to the free pool.
+ * </ul>
+ * A free (unallocated) ssp is returned for every allocate request and when there is no available ssp to allocate,
+ * the allocator throws NoAvailablePartitionException. Allocator could run out of free ssps as a result of dynamic
+ * shard splits.
+ */
+class SSPAllocator {
+ private static final Logger LOG = LoggerFactory.getLogger(SSPAllocator.class.getName());
+
+ private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
+
+ synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
+ Validate.isTrue(availableSsps.get(stream) != null,
+ String.format("availableSsps is null for stream %s", stream));
+
+ if (availableSsps.get(stream).size() <= 0) {
+ // Set a flag in system consumer so that it could throw an exception in the subsequent poll.
+ throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
+ + " registered. Could be the result of dynamic resharding.", stream));
+ }
+
+ SystemStreamPartition ssp = availableSsps.get(stream).iterator().next();
+ availableSsps.get(stream).remove(ssp);
+
+ LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+ availableSsps.get(ssp.getStream()).size());
+ return ssp;
+ }
+
+ synchronized void free(SystemStreamPartition ssp) {
+ boolean success = availableSsps.computeIfAbsent(ssp.getStream(), p -> new HashSet<>()).add(ssp);
+ Validate.isTrue(success, String.format("Ssp %s is already in free pool.", ssp));
+
+ LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+ availableSsps.get(ssp.getStream()).size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
new file mode 100644
index 0000000..2f42981
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * KinesisSystemConsumerMetrics class has per-stream metrics and aggregate metrics across kinesis consumers
+ */
+
+public class KinesisSystemConsumerMetrics {
+
+ private final MetricsRegistry registry;
+
+ // Aggregate metrics across all kinesis system consumers
+ private static Counter aggEventReadRate = null;
+ private static Counter aggEventByteReadRate = null;
+ private static SamzaHistogram aggReadLatency = null;
+ private static SamzaHistogram aggMillisBehindLatest = null;
+
+ // Per-stream metrics
+ private Map<String, Counter> eventReadRates;
+ private Map<String, Counter> eventByteReadRates;
+ private Map<String, SamzaHistogram> readLatencies;
+ private Map<String, SamzaHistogram> millisBehindLatest;
+
+ private static final Object LOCK = new Object();
+
+ private static final String AGGREGATE = "aggregate";
+ private static final String EVENT_READ_RATE = "eventReadRate";
+ private static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
+ private static final String READ_LATENCY = "readLatency";
+ private static final String MILLIS_BEHIND_LATEST = "millisBehindLatest";
+
+ public KinesisSystemConsumerMetrics(MetricsRegistry registry) {
+ this.registry = registry;
+ }
+
+ public void initializeMetrics(Set<String> streamNames) {
+ eventReadRates = streamNames.stream()
+ .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+ eventByteReadRates = streamNames.stream()
+ .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+ readLatencies = streamNames.stream()
+ .collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+ millisBehindLatest = streamNames.stream()
+ .collect(Collectors.toConcurrentMap(Function.identity(),
+ x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
+
+ // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+ synchronized (LOCK) {
+ if (aggEventReadRate == null) {
+ aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
+ aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
+ aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+ aggMillisBehindLatest = new SamzaHistogram(registry, AGGREGATE, MILLIS_BEHIND_LATEST);
+ }
+ }
+ }
+
+ public void updateMillisBehindLatest(String stream, Long millisBehindLatest) {
+ this.millisBehindLatest.get(stream).update(millisBehindLatest);
+ aggMillisBehindLatest.update(millisBehindLatest);
+ }
+
+ public void updateMetrics(String stream, Record record) {
+ eventReadRates.get(stream).inc();
+ aggEventReadRate.inc();
+
+ long recordSize = record.getData().array().length + record.getPartitionKey().length();
+ eventByteReadRates.get(stream).inc(recordSize);
+ aggEventByteReadRate.inc(recordSize);
+
+ long latencyMs = Duration.between(Instant.now(), record.getApproximateArrivalTimestamp().toInstant()).toMillis();
+ readLatencies.get(stream).update(latencyMs);
+ aggReadLatency.update(latencyMs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..29964dc
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+
+
+class SamzaHistogram {
+
+ private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+ private final MetricsRegistry registry;
+ private final Histogram histogram;
+ private final List<Double> percentiles;
+ private final Map<Double, Gauge<Double>> gauges;
+
+ SamzaHistogram(MetricsRegistry registry, String group, String name) {
+ this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+ }
+
+ SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+ this.registry = registry;
+ this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+ this.percentiles = percentiles;
+ this.gauges = percentiles.stream()
+ .filter(x -> x > 0 && x <= 100)
+ .collect(
+ Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+ }
+
+ synchronized void update(long value) {
+ histogram.update(value);
+ Snapshot values = histogram.getSnapshot();
+ percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..93887ed
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisAWSCredentialsProvider {
+
+ @Test
+ public void testCredentialsProviderWithNonNullKeys() {
+ String accessKey = "accessKey";
+ String secretKey = "secretKey";
+ KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, secretKey);
+ assertEquals(credProvider.getCredentials().getAWSAccessKeyId(), accessKey);
+ assertEquals(credProvider.getCredentials().getAWSSecretKey(), secretKey);
+ }
+
+ @Test
+ public void testCredentialsProviderWithNullAccessKey() {
+ String secretKey = "secretKey";
+ KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, secretKey);
+ assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+ assertNull(credProvider.getCredentials().getAWSSecretKey());
+ }
+
+ @Test
+ public void testCredentialsProviderWithNullSecretKey() {
+ String accessKey = "accessKey";
+ KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, null);
+ assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+ assertNull(credProvider.getCredentials().getAWSSecretKey());
+ }
+
+ @Test
+ public void testCredentialsProviderWithNullKeys() {
+ KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, null);
+ assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+ assertNull(credProvider.getCredentials().getAWSSecretKey());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
new file mode 100644
index 0000000..56e4810
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisConfig {
+ @Test
+ public void testGetKinesisStreams() {
+ Map<String, String> kv = new HashMap<>();
+ kv.put("systems.kinesis.streams.kinesis-stream1.prop1", "value1");
+ kv.put("systems.kinesis.streams.kinesis-stream1.prop2", "value2");
+ kv.put("systems.kinesis.streams.kinesis-stream2.prop1", "value3");
+
+ Config config = new MapConfig(kv);
+ KinesisConfig kConfig = new KinesisConfig(config);
+
+ Set<String> streams = kConfig.getKinesisStreams("kinesis");
+ assertEquals(2, streams.size());
+ }
+
+ @Test
+ public void testKinesisConfigs() {
+ Map<String, String> kv = new HashMap<>();
+ String system = "kinesis";
+ String stream = "kinesis-stream";
+ String systemConfigPrefix = String.format("systems.%s.", system);
+ String ssConfigPrefix = String.format("systems.%s.streams.%s.", system, stream);
+
+ kv.put("sensitive." + ssConfigPrefix + "aws.secretKey", "secretKey");
+ kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+ kv.put(ssConfigPrefix + "aws.accessKey", "accessKey");
+
+ Config config = new MapConfig(kv);
+ KinesisConfig kConfig = new KinesisConfig(config);
+
+ assertEquals("us-east-1", kConfig.getRegion(system, stream).getName());
+ assertEquals("accessKey", kConfig.getStreamAccessKey(system, stream));
+ assertEquals("secretKey", kConfig.getStreamSecretKey(system, stream));
+ }
+
+ @Test
+ public void testAwsClientConfigs() {
+ Map<String, String> kv = new HashMap<>();
+ String system = "kinesis";
+ String systemConfigPrefix = String.format("systems.%s.", system);
+
+ // Aws Client Configs
+ kv.put(systemConfigPrefix + "aws.clientConfig.ProxyHost", "hostName");
+ kv.put(systemConfigPrefix + "aws.clientConfig.ProxyPort", "8080");
+
+ Config config = new MapConfig(kv);
+ KinesisConfig kConfig = new KinesisConfig(config);
+
+ assertEquals("hostName", kConfig.getAWSClientConfig(system).getProxyHost());
+ assertEquals(8080, kConfig.getAWSClientConfig(system).getProxyPort());
+ }
+
+ @Test
+ public void testKclConfigs() {
+ Map<String, String> kv = new HashMap<>();
+ String system = "kinesis";
+ String stream = "kinesis-stream";
+ String systemConfigPrefix = String.format("systems.%s.", system);
+
+ // region config is required for setting kcl config.
+ kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+
+ // Kcl Configs
+ kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table");
+ kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100");
+ kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true");
+ kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON");
+ // override one of the Kcl configs for kinesis-stream1
+ kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST");
+
+ Config config = new MapConfig(kv);
+ KinesisConfig kConfig = new KinesisConfig(config);
+ KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app");
+
+ assertEquals("sample-table", kclConfig.getTableName());
+ assertEquals(100, kclConfig.getMaxRecords());
+ assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList());
+ assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream());
+
+ // verify if the overriden config is applied for kinesis-stream1
+ kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app");
+ assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream());
+ }
+
+ @Test
+ public void testgetKCLConfigWithUnknownConfigs() {
+ Map<String, String> kv = new HashMap<>();
+ kv.put("systems.kinesis.aws.region", "us-east-1");
+ kv.put("systems.kinesis.streams.kinesis-stream.aws.kcl.random", "value");
+
+ Config config = new MapConfig(kv);
+ KinesisConfig kConfig = new KinesisConfig(config);
+
+ // Should not throw any exception and just ignore the unknown configs.
+ kConfig.getKinesisClientLibConfig("kinesis", "kinesis-stream", "sample-app");
+ }
+}