You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/08 01:15:32 UTC

samza git commit: SAMZA-1437; Added tests, make eventData creation extensible

Repository: samza
Updated Branches:
  refs/heads/master 52d8ddd6e -> 05113c339


SAMZA-1437; Added tests, make eventData creation extensible

vjagadish1989 Required for internal custom eventData creation

Author: Daniel Chen <29...@users.noreply.github.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #352 from dxichen/create-event-data-extensible


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/05113c33
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/05113c33
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/05113c33

Branch: refs/heads/master
Commit: 05113c3395f92b74e41c75b0ca28153fb3dbf915
Parents: 52d8ddd
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Tue Nov 7 17:15:28 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 7 17:15:28 2017 -0800

----------------------------------------------------------------------
 .../producer/EventHubSystemProducer.java        |  2 +-
 .../consumer/TestEventHubSystemConsumer.java    | 56 ++++++++++++++++++-
 .../producer/SwapFirstLastByteInterceptor.java  | 36 +++++++++++++
 .../producer/TestEventHubSystemProducer.java    | 57 ++++++++++++++++++++
 4 files changed, 149 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index c8c5538..505421c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -271,7 +271,7 @@ public class EventHubSystemProducer implements SystemProducer {
     }
   }
 
-  private EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) {
+  protected EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) {
     Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamName, null));
     byte[] eventValue = (byte[]) envelope.getMessage();
     if (interceptor.isPresent()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index b89c805..a25a3b6 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -27,6 +27,7 @@ import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.eventhub.*;
 import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
+import org.apache.samza.system.eventhub.producer.SwapFirstLastByteInterceptor;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,12 +47,16 @@ public class TestEventHubSystemConsumer {
   private static final String MOCK_ENTITY_2 = "mocktopic2";
 
   private void verifyEvents(List<IncomingMessageEnvelope> messages, List<EventData> eventDataList) {
+    verifyEvents(messages, eventDataList, new PassThroughInterceptor());
+  }
+
+  private void verifyEvents(List<IncomingMessageEnvelope> messages, List<EventData> eventDataList, Interceptor interceptor) {
     Assert.assertEquals(messages.size(), eventDataList.size());
     for (int i = 0; i < messages.size(); i++) {
       IncomingMessageEnvelope message = messages.get(i);
       EventData eventData = eventDataList.get(i);
       Assert.assertEquals(message.getKey(), eventData.getSystemProperties().getPartitionKey());
-      Assert.assertEquals(message.getMessage(), eventData.getBytes());
+      Assert.assertEquals(message.getMessage(), interceptor.intercept(eventData.getBytes()));
       Assert.assertEquals(message.getOffset(), eventData.getSystemProperties().getOffset());
     }
   }
@@ -144,6 +149,55 @@ public class TestEventHubSystemConsumer {
   }
 
   @Test
+  public void testSinglePartitionConsumptionInterceptor() throws Exception {
+    String systemName = "eventhubs";
+    String streamName = "testStream";
+    int numEvents = 10; // needs to be less than BLOCKING_QUEUE_SIZE
+    int partitionId = 0;
+    Interceptor interceptor = new SwapFirstLastByteInterceptor();
+
+    TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+    Map<SystemStreamPartition, List<EventData>> eventData = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition(systemName, streamName, new Partition(partitionId));
+    Map<String, Interceptor> interceptors = new HashMap<>();
+    interceptors.put(streamName, interceptor);
+
+    // create EventData
+    List<EventData> singlePartitionEventData = MockEventData.generateEventData(numEvents);
+    eventData.put(ssp, singlePartitionEventData);
+
+    // Set configs
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
+
+    MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
+
+    EventHubSystemConsumer consumer =
+            new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptors,
+                    testMetrics);
+    consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
+    consumer.start();
+
+    // Mock received data from EventHub
+    eventHubClientWrapperFactory.sendToHandlers(consumer.streamPartitionHandlers);
+
+    List<IncomingMessageEnvelope> result = consumer.poll(Collections.singleton(ssp), 1000).get(ssp);
+
+    verifyEvents(result, singlePartitionEventData, interceptor);
+    Assert.assertEquals(testMetrics.getCounters(streamName).size(), 3);
+    Assert.assertEquals(testMetrics.getGauges(streamName).size(), 2);
+    Map<String, Counter> counters =
+            testMetrics.getCounters(streamName).stream().collect(Collectors.toMap(Counter::getName, Function.identity()));
+
+    Assert.assertEquals(counters.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(), numEvents);
+    Assert.assertEquals(counters.get(EventHubSystemConsumer.READ_ERRORS).getCount(), 0);
+  }
+
+  @Test
   public void testMultiPartitionConsumptionHappyPath() throws Exception {
     String systemName = "eventhubs";
     String streamName = "testStream";

http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java
new file mode 100644
index 0000000..912e087
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system.eventhub.producer;
+
+import org.apache.samza.system.eventhub.Interceptor;
+
+public class SwapFirstLastByteInterceptor implements Interceptor {
+
+  @Override
+  public byte[] intercept(byte[] bytes) {
+    // Swap first and last bytes
+    if (bytes.length < 2)
+      return bytes;
+    byte tmp = bytes[bytes.length - 1];
+    bytes[bytes.length - 1] = bytes[0];
+    bytes[0] = tmp;
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index bf62e92..10016ec 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -102,6 +102,63 @@ public class TestEventHubSystemProducer {
   }
 
   @Test
+  public void testSendingToSpecificPartitionsWithInterceptor() throws Exception {
+    String systemName = "eventhubs";
+    String streamName = "testStream";
+    int numEvents = 10;
+    int partitionId0 = 0;
+    int partitionId1 = 1;
+    Interceptor interceptor = new SwapFirstLastByteInterceptor();
+
+    TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+    Map<String, Interceptor> interceptors = new HashMap<>();
+    interceptors.put(streamName, interceptor);
+
+    List<String> outgoingMessagesP0 = generateMessages(numEvents);
+    List<String> outgoingMessagesP1 = generateMessages(numEvents);
+
+    // Set configs
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
+            PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+
+    MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
+
+    EventHubSystemProducer producer =
+            new EventHubSystemProducer(new EventHubConfig(configMap), systemName, factory, interceptors, testMetrics);
+
+    SystemStream systemStream = new SystemStream(systemName, streamName);
+    producer.register(streamName);
+    producer.start();
+
+    outgoingMessagesP0.forEach(message ->
+            producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+    outgoingMessagesP1.forEach(message ->
+            producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+
+    // Retrieve sent data
+    List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0)
+            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+    List<String> receivedData1 = factory.getSentData(systemName, streamName, partitionId1)
+            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+
+    List<String> expectedP0 = outgoingMessagesP0.stream()
+            .map(message -> new String(interceptor.intercept(message.getBytes())))
+            .collect(Collectors.toList());
+    List<String> expectedP1 = outgoingMessagesP1.stream()
+            .map(message -> new String(interceptor.intercept(message.getBytes())))
+            .collect(Collectors.toList());
+
+    Assert.assertTrue(expectedP0.equals(receivedData0));
+    Assert.assertTrue(expectedP1.equals(receivedData1));
+  }
+
+  @Test
   public void testSendingToEventHubHashing() throws Exception {
     String systemName = "eventhubs";
     String streamName = "testStream";