You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/08 01:15:32 UTC
samza git commit: SAMZA-1437;
Added tests, make eventData creation extensible
Repository: samza
Updated Branches:
refs/heads/master 52d8ddd6e -> 05113c339
SAMZA-1437; Added tests, make eventData creation extensible
vjagadish1989 Required for internal custom eventData creation
Author: Daniel Chen <29...@users.noreply.github.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #352 from dxichen/create-event-data-extensible
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/05113c33
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/05113c33
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/05113c33
Branch: refs/heads/master
Commit: 05113c3395f92b74e41c75b0ca28153fb3dbf915
Parents: 52d8ddd
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Tue Nov 7 17:15:28 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 7 17:15:28 2017 -0800
----------------------------------------------------------------------
.../producer/EventHubSystemProducer.java | 2 +-
.../consumer/TestEventHubSystemConsumer.java | 56 ++++++++++++++++++-
.../producer/SwapFirstLastByteInterceptor.java | 36 +++++++++++++
.../producer/TestEventHubSystemProducer.java | 57 ++++++++++++++++++++
4 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index c8c5538..505421c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -271,7 +271,7 @@ public class EventHubSystemProducer implements SystemProducer {
}
}
- private EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) {
+ protected EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) {
Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamName, null));
byte[] eventValue = (byte[]) envelope.getMessage();
if (interceptor.isPresent()) {
http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index b89c805..a25a3b6 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -27,6 +27,7 @@ import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.eventhub.*;
import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
+import org.apache.samza.system.eventhub.producer.SwapFirstLastByteInterceptor;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -46,12 +47,16 @@ public class TestEventHubSystemConsumer {
private static final String MOCK_ENTITY_2 = "mocktopic2";
private void verifyEvents(List<IncomingMessageEnvelope> messages, List<EventData> eventDataList) {
+ verifyEvents(messages, eventDataList, new PassThroughInterceptor());
+ }
+
+ private void verifyEvents(List<IncomingMessageEnvelope> messages, List<EventData> eventDataList, Interceptor interceptor) {
Assert.assertEquals(messages.size(), eventDataList.size());
for (int i = 0; i < messages.size(); i++) {
IncomingMessageEnvelope message = messages.get(i);
EventData eventData = eventDataList.get(i);
Assert.assertEquals(message.getKey(), eventData.getSystemProperties().getPartitionKey());
- Assert.assertEquals(message.getMessage(), eventData.getBytes());
+ Assert.assertEquals(message.getMessage(), interceptor.intercept(eventData.getBytes()));
Assert.assertEquals(message.getOffset(), eventData.getSystemProperties().getOffset());
}
}
@@ -144,6 +149,55 @@ public class TestEventHubSystemConsumer {
}
@Test
+ public void testSinglePartitionConsumptionInterceptor() throws Exception {
+ String systemName = "eventhubs";
+ String streamName = "testStream";
+ int numEvents = 10; // needs to be less than BLOCKING_QUEUE_SIZE
+ int partitionId = 0;
+ Interceptor interceptor = new SwapFirstLastByteInterceptor();
+
+ TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+ Map<SystemStreamPartition, List<EventData>> eventData = new HashMap<>();
+ SystemStreamPartition ssp = new SystemStreamPartition(systemName, streamName, new Partition(partitionId));
+ Map<String, Interceptor> interceptors = new HashMap<>();
+ interceptors.put(streamName, interceptor);
+
+ // create EventData
+ List<EventData> singlePartitionEventData = MockEventData.generateEventData(numEvents);
+ eventData.put(ssp, singlePartitionEventData);
+
+ // Set configs
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
+
+ MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
+
+ EventHubSystemConsumer consumer =
+ new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptors,
+ testMetrics);
+ consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
+ consumer.start();
+
+ // Mock received data from EventHub
+ eventHubClientWrapperFactory.sendToHandlers(consumer.streamPartitionHandlers);
+
+ List<IncomingMessageEnvelope> result = consumer.poll(Collections.singleton(ssp), 1000).get(ssp);
+
+ verifyEvents(result, singlePartitionEventData, interceptor);
+ Assert.assertEquals(testMetrics.getCounters(streamName).size(), 3);
+ Assert.assertEquals(testMetrics.getGauges(streamName).size(), 2);
+ Map<String, Counter> counters =
+ testMetrics.getCounters(streamName).stream().collect(Collectors.toMap(Counter::getName, Function.identity()));
+
+ Assert.assertEquals(counters.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(), numEvents);
+ Assert.assertEquals(counters.get(EventHubSystemConsumer.READ_ERRORS).getCount(), 0);
+ }
+
+ @Test
public void testMultiPartitionConsumptionHappyPath() throws Exception {
String systemName = "eventhubs";
String streamName = "testStream";
http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java
new file mode 100644
index 0000000..912e087
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.producer;
+
+import org.apache.samza.system.eventhub.Interceptor;
+
+public class SwapFirstLastByteInterceptor implements Interceptor {
+
+ @Override
+ public byte[] intercept(byte[] bytes) {
+ // Swap first and last bytes
+ if (bytes.length < 2)
+ return bytes;
+ byte tmp = bytes[bytes.length - 1];
+ bytes[bytes.length - 1] = bytes[0];
+ bytes[0] = tmp;
+ return bytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index bf62e92..10016ec 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -102,6 +102,63 @@ public class TestEventHubSystemProducer {
}
@Test
+ public void testSendingToSpecificPartitionsWithInterceptor() throws Exception {
+ String systemName = "eventhubs";
+ String streamName = "testStream";
+ int numEvents = 10;
+ int partitionId0 = 0;
+ int partitionId1 = 1;
+ Interceptor interceptor = new SwapFirstLastByteInterceptor();
+
+ TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+ Map<String, Interceptor> interceptors = new HashMap<>();
+ interceptors.put(streamName, interceptor);
+
+ List<String> outgoingMessagesP0 = generateMessages(numEvents);
+ List<String> outgoingMessagesP1 = generateMessages(numEvents);
+
+ // Set configs
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
+ configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
+ configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
+ PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+
+ MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
+
+ EventHubSystemProducer producer =
+ new EventHubSystemProducer(new EventHubConfig(configMap), systemName, factory, interceptors, testMetrics);
+
+ SystemStream systemStream = new SystemStream(systemName, streamName);
+ producer.register(streamName);
+ producer.start();
+
+ outgoingMessagesP0.forEach(message ->
+ producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+ outgoingMessagesP1.forEach(message ->
+ producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+
+ // Retrieve sent data
+ List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0)
+ .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+ List<String> receivedData1 = factory.getSentData(systemName, streamName, partitionId1)
+ .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+
+ List<String> expectedP0 = outgoingMessagesP0.stream()
+ .map(message -> new String(interceptor.intercept(message.getBytes())))
+ .collect(Collectors.toList());
+ List<String> expectedP1 = outgoingMessagesP1.stream()
+ .map(message -> new String(interceptor.intercept(message.getBytes())))
+ .collect(Collectors.toList());
+
+ Assert.assertTrue(expectedP0.equals(receivedData0));
+ Assert.assertTrue(expectedP1.equals(receivedData1));
+ }
+
+ @Test
public void testSendingToEventHubHashing() throws Exception {
String systemName = "eventhubs";
String streamName = "testStream";