You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/03/27 18:40:00 UTC

samza git commit: Skipping large messages in the EventHub system producer

Repository: samza
Updated Branches:
  refs/heads/master b0603c3c7 -> aff805d07


Skipping large messages in the EventHub system producer

EventHubs have restriction on maximum message sizes that can be allowed. Adding a `systems.%s.eventhubs.skipMessagesLargerThanBytes` that can be used in the event hub system to make it skip messages that are larger than specific bytes so that we don't even try to send those large messages because EventHubs will reject them anyways.

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #454 from srinipunuru/skip.2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/aff805d0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/aff805d0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/aff805d0

Branch: refs/heads/master
Commit: aff805d07e8fc6b50d88d4e78132fa8bd7c3ad5f
Parents: b0603c3
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Tue Mar 27 11:39:52 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Mar 27 11:39:52 2018 -0700

----------------------------------------------------------------------
 .../samza/system/eventhub/EventHubConfig.java   |  15 ++
 .../producer/EventHubSystemProducer.java        |  95 ++++++----
 .../producer/TestEventHubSystemProducer.java    | 186 +++++++++++++++----
 samza-azure/src/test/resources/log4j.xml        |  43 +++++
 4 files changed, 267 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/aff805d0/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index e9c383a..7df92c0 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -44,6 +44,8 @@ public class EventHubConfig extends MapConfig {
 
   public static final String CONFIG_STREAM_SAS_TOKEN = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.token";
 
+  public static final String CONFIG_SKIP_MESSAGES_LARGER_THAN = "systems.%s.eventhubs.skipMessagesLargerThanBytes";
+
   public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group";
   public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
 
@@ -60,6 +62,9 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = "systems.%s.eventhubs.receive.queue.size";
   public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100;
 
+  // By default we will skip messages larger than 1MB.
+  private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024;
+
   private final Map<String, String> physcialToId = new HashMap<>();
 
   public EventHubConfig(Config config) {
@@ -139,6 +144,16 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
+   * Get the EventHubs max Message size
+   *
+   * @param systemName name of the system
+   * @return the max message size supported in event hubs.
+   */
+  public Integer getSkipMessagesLargerThan(String systemName) {
+    return getInt(String.format(CONFIG_SKIP_MESSAGES_LARGER_THAN, systemName), DEFAULT_MAX_MESSAGE_SIZE);
+  }
+
+  /**
    * Get the EventHubs SAS (Shared Access Signature) key name for the stream
    *
    * @param systemName name of the system

http://git-wip-us.apache.org/repos/asf/samza/blob/aff805d0/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index f294751..258b434 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -23,19 +23,6 @@ import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.PartitionSender;
 import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.samza.SamzaException;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.eventhub.EventHubClientManager;
-import org.apache.samza.system.eventhub.EventHubConfig;
-import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
-import org.apache.samza.system.eventhub.Interceptor;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -48,9 +35,22 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.eventhub.EventHubClientManager;
+import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.Interceptor;
+import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 public class EventHubSystemProducer implements SystemProducer {
   private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemProducer.class.getName());
@@ -61,11 +61,13 @@ public class EventHubSystemProducer implements SystemProducer {
   public static final String KEY = "key";
 
   // Metrics recording
+  private static final String EVENT_SKIP_RATE = "eventSkipRate";
   private static final String EVENT_WRITE_RATE = "eventWriteRate";
   private static final String EVENT_BYTE_WRITE_RATE = "eventByteWriteRate";
   private static final String SEND_ERRORS = "sendErrors";
   private static final String SEND_LATENCY = "sendLatency";
   private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
+  private static Counter aggEventSkipRate = null;
   private static Counter aggEventWriteRate = null;
   private static Counter aggEventByteWriteRate = null;
   private static Counter aggSendErrors = null;
@@ -76,11 +78,10 @@ public class EventHubSystemProducer implements SystemProducer {
   private static final Object AGGREGATE_METRICS_LOCK = new Object();
 
   public enum PartitioningMethod {
-    ROUND_ROBIN,
-    EVENT_HUB_HASHING,
-    PARTITION_KEY_AS_PARTITION
+    ROUND_ROBIN, EVENT_HUB_HASHING, PARTITION_KEY_AS_PARTITION
   }
 
+  private final HashMap<String, Counter> eventSkipRate = new HashMap<>();
   private final HashMap<String, Counter> eventWriteRate = new HashMap<>();
   private final HashMap<String, Counter> eventByteWriteRate = new HashMap<>();
   private final HashMap<String, SamzaHistogram> sendLatency = new HashMap<>();
@@ -91,6 +92,7 @@ public class EventHubSystemProducer implements SystemProducer {
   private final MetricsRegistry registry;
   private final PartitioningMethod partitioningMethod;
   private final String systemName;
+  private final int maxMessageSize;
 
   private final AtomicReference<Throwable> sendExceptionOnCallback = new AtomicReference<>(null);
   private volatile boolean isStarted = false;
@@ -104,20 +106,23 @@ public class EventHubSystemProducer implements SystemProducer {
   private final Set<CompletableFuture<Void>> pendingFutures = ConcurrentHashMap.newKeySet();
 
   public EventHubSystemProducer(EventHubConfig config, String systemName,
-                                EventHubClientManagerFactory eventHubClientManagerFactory,
-                                Map<String, Interceptor> interceptors, MetricsRegistry registry) {
+      EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
+      MetricsRegistry registry) {
+    LOG.info("Creating EventHub Producer for system {}", systemName);
     this.config = config;
     this.registry = registry;
     this.systemName = systemName;
     this.partitioningMethod = config.getPartitioningMethod(systemName);
     this.interceptors = interceptors;
+    this.maxMessageSize = config.getSkipMessagesLargerThan(systemName);
 
     // Fetches the stream ids
     List<String> streamIds = config.getStreams(systemName);
 
     // Create and initiate connections to Event Hubs
     for (String streamId : streamIds) {
-      EventHubClientManager ehClient = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
+      EventHubClientManager ehClient =
+          eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
       eventHubClients.put(streamId, ehClient);
       ehClient.init();
     }
@@ -125,6 +130,7 @@ public class EventHubSystemProducer implements SystemProducer {
 
   @Override
   public synchronized void register(String source) {
+    LOG.info("Registering source {}", source);
     if (isStarted) {
       String msg = "Cannot register once the producer is started.";
       throw new SamzaException(msg);
@@ -133,7 +139,7 @@ public class EventHubSystemProducer implements SystemProducer {
 
   @Override
   public synchronized void start() {
-    LOG.debug("Starting system producer.");
+    LOG.info("Starting system producer.");
 
     // Create partition senders if required
     if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
@@ -144,8 +150,8 @@ public class EventHubSystemProducer implements SystemProducer {
           try {
             Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
             long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
-            Integer numPartitions = ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS)
-                    .getPartitionCount();
+            Integer numPartitions =
+                ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
 
             for (int i = 0; i < numPartitions; i++) { // 32 partitions max
               String partitionId = String.valueOf(i);
@@ -166,6 +172,7 @@ public class EventHubSystemProducer implements SystemProducer {
 
     // Initiate metrics
     eventHubClients.keySet().forEach((streamId) -> {
+        eventSkipRate.put(streamId, registry.newCounter(streamId, EVENT_SKIP_RATE));
         eventWriteRate.put(streamId, registry.newCounter(streamId, EVENT_WRITE_RATE));
         eventByteWriteRate.put(streamId, registry.newCounter(streamId, EVENT_BYTE_WRITE_RATE));
         sendLatency.put(streamId, new SamzaHistogram(registry, streamId, SEND_LATENCY));
@@ -176,6 +183,7 @@ public class EventHubSystemProducer implements SystemProducer {
     // Locking to ensure that these aggregated metrics will be created only once across multiple system producers.
     synchronized (AGGREGATE_METRICS_LOCK) {
       if (aggEventWriteRate == null) {
+        aggEventSkipRate = registry.newCounter(AGGREGATE, EVENT_SKIP_RATE);
         aggEventWriteRate = registry.newCounter(AGGREGATE, EVENT_WRITE_RATE);
         aggEventByteWriteRate = registry.newCounter(AGGREGATE, EVENT_BYTE_WRITE_RATE);
         aggSendLatency = new SamzaHistogram(registry, AGGREGATE, SEND_LATENCY);
@@ -204,7 +212,17 @@ public class EventHubSystemProducer implements SystemProducer {
     checkCallbackThrowable("Received exception on message send");
 
     EventData eventData = createEventData(streamId, envelope);
-    int eventDataLength =  eventData.getBytes() == null ? 0 : eventData.getBytes().length;
+    int eventDataLength = eventData.getBytes() == null ? 0 : eventData.getBytes().length;
+
+    // If the maxMessageSize is lesser than zero, then it means there is no message size restriction.
+    if (this.maxMessageSize > 0 && eventDataLength > this.maxMessageSize) {
+      LOG.info("Received a message with size {} > maxMessageSize configured {(}), Skipping it",
+          eventDataLength, this.maxMessageSize);
+      eventSkipRate.get(streamId).inc();
+      aggEventSkipRate.inc();
+      return;
+    }
+
     eventWriteRate.get(streamId).inc();
     aggEventWriteRate.inc();
     eventByteWriteRate.get(streamId).inc(eventDataLength);
@@ -214,8 +232,8 @@ public class EventHubSystemProducer implements SystemProducer {
     long beforeSendTimeMs = System.currentTimeMillis();
 
     // Async send call
-    CompletableFuture<Void> sendResult = sendToEventHub(streamId, eventData, getEnvelopePartitionId(envelope),
-            ehClient.getEventHubClient());
+    CompletableFuture<Void> sendResult =
+        sendToEventHub(streamId, eventData, getEnvelopePartitionId(envelope), ehClient.getEventHubClient());
 
     long afterSendTimeMs = System.currentTimeMillis();
     long latencyMs = afterSendTimeMs - beforeSendTimeMs;
@@ -240,7 +258,7 @@ public class EventHubSystemProducer implements SystemProducer {
   }
 
   private CompletableFuture<Void> sendToEventHub(String streamId, EventData eventData, Object partitionKey,
-                                                 EventHubClient eventHubClient) {
+      EventHubClient eventHubClient) {
     if (PartitioningMethod.ROUND_ROBIN.equals(partitioningMethod)) {
       return eventHubClient.send(eventData);
     } else if (PartitioningMethod.EVENT_HUB_HASHING.equals(partitioningMethod)) {
@@ -276,7 +294,7 @@ public class EventHubSystemProducer implements SystemProducer {
     } else if (partitionKey instanceof byte[]) {
       return new String((byte[]) partitionKey, Charset.defaultCharset());
     } else {
-      throw new SamzaException("Unsupported key type: " +  partitionKey.getClass().toString());
+      throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString());
     }
   }
 
@@ -295,7 +313,7 @@ public class EventHubSystemProducer implements SystemProducer {
       String keyValue = "";
       if (envelope.getKey() != null) {
         keyValue = (envelope.getKey() instanceof byte[]) ? new String((byte[]) envelope.getKey())
-                : envelope.getKey().toString();
+            : envelope.getKey().toString();
       }
       eventData.getProperties().put(KEY, keyValue);
     }
@@ -304,27 +322,30 @@ public class EventHubSystemProducer implements SystemProducer {
 
   @Override
   public synchronized void flush(String source) {
-    LOG.debug("Trying to flush pending {} sends messages.", pendingFutures.size());
-    checkCallbackThrowable("Received exception on message send");
-
-    CompletableFuture<Void> future = CompletableFuture
-              .allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
+    long incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
+    LOG.info("Trying to flush pending {} sends.", incompleteSends);
+    checkCallbackThrowable("Received exception on message send.");
+    CompletableFuture<Void> future =
+        CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
 
     try {
       // Block until all the pending sends are complete or timeout.
       future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
     } catch (InterruptedException | ExecutionException | TimeoutException e) {
-      String msg = "Flush failed with error";
+      incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
+      String msg = String.format("Flush failed with error. Total pending sends %d", incompleteSends);
+      LOG.error(msg, e);
       throw new SamzaException(msg, e);
     }
 
-    checkCallbackThrowable("Sending one or more of the messages failed during flush");
+    checkCallbackThrowable("Sending one or more of the messages failed during flush.");
   }
 
   private void checkCallbackThrowable(String msg) {
     // Check for send errors from EventHub side
     Throwable sendThrowable = sendExceptionOnCallback.get();
     if (sendThrowable != null) {
+      LOG.error(msg, sendThrowable);
       throw new SamzaException(msg, sendThrowable);
     }
     pendingFutures.clear();
@@ -332,7 +353,7 @@ public class EventHubSystemProducer implements SystemProducer {
 
   @Override
   public synchronized void stop() {
-    LOG.debug("Stopping producer.", pendingFutures.size());
+    LOG.info("Stopping producer.", pendingFutures.size());
 
     streamPartitionSenders.values().forEach((streamPartitionSender) -> {
         List<CompletableFuture<Void>> futures = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/aff805d0/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index 8572e95..0e12d73 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -19,30 +19,37 @@
 
 package org.apache.samza.system.eventhub.producer;
 
-import com.microsoft.azure.eventhubs.*;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
+import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.apache.samza.system.eventhub.Interceptor;
 import org.apache.samza.system.eventhub.MockEventHubClientManagerFactory;
+import org.apache.samza.system.eventhub.TestMetricsRegistry;
 import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
-import org.apache.samza.system.eventhub.TestMetricsRegistry;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.*;
-import java.util.stream.Collectors;
-
 import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
+
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class,
-        EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
+@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
 public class TestEventHubSystemProducer {
 
   private static final String SOURCE = "TestEventHubSystemProducer";
@@ -79,33 +86,134 @@ public class TestEventHubSystemProducer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
-            PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+        PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
 
     EventHubSystemProducer producer =
-            new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics);
+        new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics);
 
     SystemStream systemStream = new SystemStream(systemName, streamName);
     producer.register(SOURCE);
     producer.start();
 
-    outgoingMessagesP0.forEach(message ->
-            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
-    outgoingMessagesP1.forEach(message ->
-            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+    outgoingMessagesP0.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+    outgoingMessagesP1.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
 
     // Retrieve sent data
     List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0)
-            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
     List<String> receivedData1 = factory.getSentData(systemName, streamName, partitionId1)
-            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
 
     Assert.assertTrue(outgoingMessagesP0.equals(receivedData0));
     Assert.assertTrue(outgoingMessagesP1.equals(receivedData1));
   }
 
+
+  @Test
+  public void testSendingLargeMessage() throws Exception {
+    String systemName = "eventhubs";
+    String streamName = "testLMStream";
+    int numEvents = 10;
+    int partitionId0 = 0;
+
+    TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+    Map<String, Interceptor> interceptor = new HashMap<>();
+    interceptor.put(streamName, new PassThroughInterceptor());
+
+    List<String> outgoingMessagesP0 = generateMessages(numEvents / 2);
+    outgoingMessagesP0.add("1234567890123456789012345678901234567890");
+    outgoingMessagesP0.addAll(generateMessages(numEvents / 2));
+
+    // Set configs
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_SKIP_MESSAGES_LARGER_THAN, systemName), "30");
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
+        PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+    MapConfig config = new MapConfig(configMap);
+
+    MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
+
+    EventHubSystemProducer producer =
+        new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics);
+
+    SystemStream systemStream = new SystemStream(systemName, streamName);
+    producer.register(SOURCE);
+    producer.start();
+
+    outgoingMessagesP0.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+
+    // Retrieve sent data
+    List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0)
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(outgoingMessagesP0.size(), receivedData0.size() + 1);
+  }
+
+  @Test
+  public void testSkipLargeMessageCheck() throws Exception {
+    String systemName = "eventhubs";
+    String streamName = "testLMStream";
+    int numEvents = 10;
+    int partitionId0 = 0;
+
+    TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+    Map<String, Interceptor> interceptor = new HashMap<>();
+    interceptor.put(streamName, new PassThroughInterceptor());
+
+    List<String> outgoingMessagesP0 = generateMessages(numEvents / 2);
+    outgoingMessagesP0.add("1234567890123456789012345678901234567890");
+    outgoingMessagesP0.addAll(generateMessages(numEvents / 2));
+
+    // Set configs
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_SKIP_MESSAGES_LARGER_THAN, systemName), "-1");
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
+        PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+    MapConfig config = new MapConfig(configMap);
+
+    MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
+
+    EventHubSystemProducer producer =
+        new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics);
+
+    SystemStream systemStream = new SystemStream(systemName, streamName);
+    producer.register(SOURCE);
+    producer.start();
+
+    outgoingMessagesP0.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+
+    // Retrieve sent data
+    List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0)
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(outgoingMessagesP0.size(), receivedData0.size());
+  }
+
   @Test
   public void testSendingToSpecificPartitionsWithInterceptor() throws Exception {
     String systemName = "eventhubs";
@@ -130,35 +238,39 @@ public class TestEventHubSystemProducer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
-            PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+        PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
 
     EventHubSystemProducer producer =
-            new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptors, testMetrics);
+        new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptors, testMetrics);
 
     SystemStream systemStream = new SystemStream(systemName, streamName);
     producer.register(SOURCE);
     producer.start();
 
-    outgoingMessagesP0.forEach(message ->
-            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
-    outgoingMessagesP1.forEach(message ->
-            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+    outgoingMessagesP0.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+    outgoingMessagesP1.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
 
     // Retrieve sent data
     List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0)
-            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
     List<String> receivedData1 = factory.getSentData(systemName, streamName, partitionId1)
-            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
 
     List<String> expectedP0 = outgoingMessagesP0.stream()
-            .map(message -> new String(interceptor.intercept(message.getBytes())))
-            .collect(Collectors.toList());
+        .map(message -> new String(interceptor.intercept(message.getBytes())))
+        .collect(Collectors.toList());
     List<String> expectedP1 = outgoingMessagesP1.stream()
-            .map(message -> new String(interceptor.intercept(message.getBytes())))
-            .collect(Collectors.toList());
+        .map(message -> new String(interceptor.intercept(message.getBytes())))
+        .collect(Collectors.toList());
 
     Assert.assertTrue(expectedP0.equals(receivedData0));
     Assert.assertTrue(expectedP1.equals(receivedData1));
@@ -189,28 +301,32 @@ public class TestEventHubSystemProducer {
 
     // mod 2 on the partitionid to simulate consistent hashing
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
-            PartitioningMethod.EVENT_HUB_HASHING.toString());
+        PartitioningMethod.EVENT_HUB_HASHING.toString());
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory();
 
     EventHubSystemProducer producer =
-            new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics);
+        new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics);
 
     SystemStream systemStream = new SystemStream(systemName, streamName);
     producer.register(SOURCE);
     producer.start();
 
-    outgoingMessagesP0.forEach(message ->
-            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
-    outgoingMessagesP1.forEach(message ->
-            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+    outgoingMessagesP0.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+    outgoingMessagesP1.forEach(message -> producer.send(SOURCE,
+        new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
 
     // Retrieve sent data
     List<String> receivedData0 = factory.getSentData(systemName, streamName, 0)
-            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
     List<String> receivedData1 = factory.getSentData(systemName, streamName, 1)
-            .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList());
+        .stream()
+        .map(eventData -> new String(eventData.getBytes()))
+        .collect(Collectors.toList());
 
     Assert.assertTrue(outgoingMessagesP0.equals(receivedData0));
     Assert.assertTrue(outgoingMessagesP1.equals(receivedData1));

http://git-wip-us.apache.org/repos/asf/samza/blob/aff805d0/samza-azure/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/resources/log4j.xml b/samza-azure/src/test/resources/log4j.xml
new file mode 100644
index 0000000..6259b48
--- /dev/null
+++ b/samza-azure/src/test/resources/log4j.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  you under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+  @log4j.appenders.webapp@
+
+  @log4j.appenders.public_access@
+
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+    </layout>
+  </appender>
+
+  @log4j.loggers.spring@
+
+  @log4j.loggers.public_access@
+  <logger name="org.apache" additivity="false">
+    <level value="DEBUG"/>
+    <appender-ref ref="console"/>
+  </logger>
+
+  @log4j.loggers.root@
+  <root>
+    <priority value ="DEBUG" />
+    <appender-ref ref="console"/>
+  </root>
+
+
+</log4j:configuration>
+