You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/07/31 16:40:27 UTC

[samza] branch master updated: SAMZA-2174: Throw a record too large exception for oversized records in changelog (#1008)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8f773  SAMZA-2174: Throw a record too large exception for oversized records in changelog (#1008)
7a8f773 is described below

commit 7a8f773de9f5a07a65e591e124e95c1aa47165ef
Author: Pawas Chhokra <pc...@linkedin.com>
AuthorDate: Wed Jul 31 09:40:21 2019 -0700

    SAMZA-2174: Throw a record too large exception for oversized records in changelog (#1008)
    
    * Throw a record too large exception for changelog oversized records
    * Change implementation to handle large messages in CachedStore based on user defined configs
    * Address review and change new Scala classes to Java
    * Address review and add a test case
---
 .../versioned/jobs/configuration-table.html        |  43 ++++
 .../versioned/jobs/samza-configurations.md         |   3 +
 .../org/apache/samza/config/StorageConfig.java     |  18 ++
 .../org/apache/samza/config/TestStorageConfig.java |  30 +++
 .../samza/storage/kv/LargeMessageSafeStore.java    | 150 +++++++++++++
 .../kv/BaseKeyValueStorageEngineFactory.scala      |  70 ++++--
 .../org/apache/samza/storage/kv/CachedStore.scala  |   3 +-
 .../storage/kv/LargeMessageSafeStoreMetrics.scala  |  30 +++
 .../org/apache/samza/storage/kv/LoggedStore.scala  |  22 +-
 .../samza/storage/kv/RecordTooLargeException.java  |  29 +++
 .../storage/kv/TestLargeMessageSafeStore.java      | 195 +++++++++++++++++
 .../kv/TestLargeMessageSafeKeyValueStores.java     | 240 +++++++++++++++++++++
 12 files changed, 804 insertions(+), 29 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index b69f214..546a3c3 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1706,6 +1706,49 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="stores-changelog-max-message-size-bytes">stores.<span class="store">store-name</span>.changelog.max.message.size.bytes</td>
+                    <td class="default">1048576</td>
+                    <td class="description">
+                        This property sets the maximum size of the messages allowed in the changelog.
+                        The default value is 1 MB.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="stores-disallow-large-messages">stores.<span class="store">store-name</span>.disallow.large.messages</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        This property, when turned on, tells the system to expect large messages to be put in the stores
+                        and disallows them. It looks out for any large messages greater than
+                        <a href="#stores-changelog-max-message-size-bytes" class="property">stores.*.changelog.max.message.size.bytes</a>
+                        and throws a SamzaException when it finds one, stating that the record is too large.
+                        In the case of using CachedStore, it will serialize the message first, validate
+                        its size and then cache it if the size is under the permissible limit.
+                        Note that if enabled retroactively, this may cause a performance regression due to the pre-caching serialization.
+                        When this property is turned on, the <a href="#stores-drop-large-messages" class="property">stores.*.drop.large.messages</a>
+                        configuration is ignored. The default value for this config is false. When this property is not set,
+                        <a href="#stores-drop-large-messages" class="property">stores.*.drop.large.messages</a>
+                        determines the large message handling behavior.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="stores-drop-large-messages">stores.<span class="store">store-name</span>.drop.large.messages</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        This property, when turned on, causes messages larger than
+                        <a href="#stores-changelog-max-message-size-bytes" class="property">stores.*.changelog.max.message.size.bytes</a>
+                        to be dropped from the underlying store and changelog. No exception is thrown when a large message is encountered.
+                        For the case when storing messages in the cache is enabled along with this config being
+                        turned on (look at the <a href="#stores-rocksdb-object-cache-size" class="property">stores.*.object.cache.size</a>
+                        config), the large message is stored in the cache but is not written to the
+                        changelog and underlying store, resulting in an inconsistent state temporarily.
+                        When this property is turned off, large messages will be sent to the changelog topic as is,
+                        and may cause the container to fail during commit.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="keyvalue-rocksdb">
                         Using RocksDB for key-value storage<br>
                         <span class="subtitle">
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index d24efc2..cde7594 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -253,6 +253,9 @@ These properties define Samza's storage mechanism for efficient [stateful stream
 |stores.**_store-name_**.key.serde| |If the storage engine expects keys in the store to be simple byte arrays, this [serde](../container/serialization.html) allows the stream task to access the store using another object type as key. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, keys are passed unmodified to the storage engine (and the changelog stream, if appropriate).|
 |stores.**_store-name_**.msg.serde| |If the storage engine expects values in the store to be simple byte arrays, this [serde](../container/serialization.html) allows the stream task to access the store using another object type as value. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, values are passed unmodified to the storage engine (and the changelog stream, if appropriate).|
 |stores.**_store-name_**.changelog| |Samza stores are local to a container. If the container fails, the contents of the store are lost. To prevent loss of data, you need to set this property to configure a changelog stream: Samza then ensures that writes to the store are replicated to this stream, and the store is restored from this stream after a failure. The value of this property is given in the form system-name.stream-name. The "system-name" part is optional. If it is omitted you mus [...]
+|stores.**_store-name_**.changelog.max.message.size.bytes|1048576|This property sets the maximum size of the messages allowed in the changelog. The default value is 1 MB.|
+|stores.**_store-name_**.disallow.large.messages|false|This property, when turned on, tells the system to expect large messages to be put in the stores and disallows them. It looks out for any large messages greater than `stores.*.changelog.max.message.size.bytes` and throws a SamzaException when it finds one, stating that the record is too large. In the case of using CachedStore, it will serialize the message first, validate its size and then cache it if the size is under the permissibl [...]
+|stores.**_store-name_**.drop.large.messages|false|This property, when turned on, causes messages larger than `stores.*.changelog.max.message.size.bytes` to be dropped from the underlying store and changelog. No exception is thrown when a large message is encountered. For the case when storing messages in the cache is enabled along with this config being turned on (look at the `stores.*.object.cache.size` config for reference), the large message is stored in the cache but is not written  [...]
 |stores.**_store-name_**.rocksdb.ttl.ms| |__For RocksDB:__ The time-to-live of the store. Please note it's not a strict TTL limit (removed only after compaction). Please use caution opening a database with and without TTL, as it might corrupt the database. Please make sure to read the [constraints](https://github.com/facebook/rocksdb/wiki/Time-to-Live) before using.|
 |job.logged.store.base.dir|_user.dir_ environment property if set, else current working directory of the process|The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable `LOGGED_STORE_BASE_DIR`. __Note:__ The environment variable takes precedence over `job.logged.store.base.dir`. <br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity shoul [...]
 |job.non-logged.store.base.dir|_user.dir_ environment property if set, else current working directory of the process|The base directory for non-changelog stores used by Samza application. <br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory. This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config `yarn.nodemanager.delete. [...]
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 5146b26..0bb9b99 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -46,6 +46,12 @@ public class StorageConfig extends MapConfig {
   public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX;
   public static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
   public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor";
+  public static final String CHANGELOG_MAX_MSG_SIZE_BYTES = STORE_PREFIX + "%s.changelog.max.message.size.bytes";
+  public static final int DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES = 1048576;
+  public static final String DISALLOW_LARGE_MESSAGES = STORE_PREFIX + "%s.disallow.large.messages";
+  public static final boolean DEFAULT_DISALLOW_LARGE_MESSAGES = false;
+  public static final String DROP_LARGE_MESSAGES = STORE_PREFIX + "%s.drop.large.messages";
+  public static final boolean DEFAULT_DROP_LARGE_MESSAGES = false;
 
   static final String CHANGELOG_SYSTEM = "job.changelog.system";
   static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
@@ -187,6 +193,18 @@ public class StorageConfig extends MapConfig {
     return getLong(String.format(CHANGELOG_DELETE_RETENTION_MS, storeName), DEFAULT_CHANGELOG_DELETE_RETENTION_MS);
   }
 
+  public int getChangelogMaxMsgSizeBytes(String storeName) {
+    return getInt(String.format(CHANGELOG_MAX_MSG_SIZE_BYTES, storeName), DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES);
+  }
+
+  public boolean getDisallowLargeMessages(String storeName) {
+    return getBoolean(String.format(DISALLOW_LARGE_MESSAGES, storeName), DEFAULT_DISALLOW_LARGE_MESSAGES);
+  }
+
+  public boolean getDropLargeMessages(String storeName) {
+    return getBoolean(String.format(DROP_LARGE_MESSAGES, storeName), DEFAULT_DROP_LARGE_MESSAGES);
+  }
+
   /**
    * Helper method to check if a system has a changelog attached to it.
    */
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 32f8ebe..2cde5df 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -251,4 +251,34 @@ public class TestStorageConfig {
             String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream")));
     assertTrue(storageConfig.hasDurableStores());
   }
+
+  @Test
+  public void testGetChangelogMaxMsgSizeBytes() {
+    // empty config, return default size
+    assertEquals(StorageConfig.DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES, new StorageConfig(new MapConfig()).getChangelogMaxMsgSizeBytes(STORE_NAME0));
+
+    StorageConfig storageConfig = new StorageConfig(
+        new MapConfig(ImmutableMap.of(String.format(StorageConfig.CHANGELOG_MAX_MSG_SIZE_BYTES, STORE_NAME0), "10")));
+    assertEquals(10, storageConfig.getChangelogMaxMsgSizeBytes(STORE_NAME0));
+  }
+
+  @Test
+  public void testGetDisallowLargeMessages() {
+    // empty config, return default size
+    assertEquals(StorageConfig.DEFAULT_DISALLOW_LARGE_MESSAGES, new StorageConfig(new MapConfig()).getDisallowLargeMessages(STORE_NAME0));
+
+    StorageConfig storageConfig = new StorageConfig(
+        new MapConfig(ImmutableMap.of(String.format(StorageConfig.DISALLOW_LARGE_MESSAGES, STORE_NAME0), "true")));
+    assertEquals(true, storageConfig.getDisallowLargeMessages(STORE_NAME0));
+  }
+
+  @Test
+  public void testGetDropLargeMessages() {
+    // empty config, return default size
+    assertEquals(StorageConfig.DEFAULT_DROP_LARGE_MESSAGES, new StorageConfig(new MapConfig()).getDropLargeMessages(STORE_NAME0));
+
+    StorageConfig storageConfig = new StorageConfig(
+        new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true")));
+    assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0));
+  }
 }
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
new file mode 100644
index 0000000..f53f9e8
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class checks for the size of the message being stored and either throws an exception when a large message is
+ * encountered or ignores the large message and continues processing, depending on how it is configured.
+ */
+public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LargeMessageSafeStore.class);
+  private final KeyValueStore<byte[], byte[]> store;
+  private final String storeName;
+  private final boolean dropLargeMessages;
+  private final int maxMessageSize;
+  private final LargeMessageSafeStoreMetrics largeMessageSafeStoreMetrics;
+
+  public LargeMessageSafeStore(KeyValueStore<byte[], byte[]> store, String storeName, boolean dropLargeMessages, int maxMessageSize) {
+    this.store = store;
+    this.storeName = storeName;
+    this.dropLargeMessages = dropLargeMessages;
+    this.maxMessageSize = maxMessageSize;
+    this.largeMessageSafeStoreMetrics = new LargeMessageSafeStoreMetrics(storeName, new MetricsRegistryMap());
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return store.get(key);
+  }
+
+  /**
+   * This function puts a message in the store after validating its size.
+   * It drops the large message if it has been configured to do so.
+   * Otherwise, it throws an exception stating that a large message was encountered.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param value the value with which the specified {@code key} is to be associated.
+   */
+  @Override
+  public void put(byte[] key, byte[] value) {
+    validateMessageSize(value);
+    if (!isLargeMessage(value)) {
+      store.put(key, value);
+    } else {
+      LOG.info("Ignoring a large message with size " + value.length + " since it is greater than "
+          + "the maximum allowed value of " + maxMessageSize);
+      largeMessageSafeStoreMetrics.ignoredLargeMessages().inc();
+    }
+  }
+
+  /**
+   * This function puts messages in the store after validating their size.
+   * It drops any large messages in the entry list if it has been configured to do so.
+   * Otherwise, it throws an exception stating that a large message was encountered,
+   * and does not put any of the messages in the store.
+   *
+   * @param entries the updated mappings to put into this key-value store.
+   */
+  @Override
+  public void putAll(List<Entry<byte[], byte[]>> entries) {
+    entries.forEach(entry -> {
+        validateMessageSize(entry.getValue());
+      });
+    List<Entry<byte[], byte[]>> largeMessageSafeEntries = removeLargeMessages(entries);
+    store.putAll(largeMessageSafeEntries);
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    store.delete(key);
+  }
+
+  @Override
+  public void deleteAll(List<byte[]> keys) {
+    store.deleteAll(keys);
+  }
+
+  @Override
+  public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
+    return store.range(from, to);
+  }
+
+  @Override
+  public KeyValueSnapshot<byte[], byte[]> snapshot(byte[] from, byte[] to) {
+    return store.snapshot(from, to);
+  }
+
+  @Override
+  public KeyValueIterator<byte[], byte[]> all() {
+    return store.all();
+  }
+
+  @Override
+  public void close() {
+    store.close();
+  }
+
+  @Override
+  public void flush() {
+    store.flush();
+  }
+
+  private void validateMessageSize(byte[] message) {
+    if (!dropLargeMessages && isLargeMessage(message)) {
+      throw new RecordTooLargeException("The message size " + message.length + " for store " + storeName
+          + " was larger than the maximum allowed message size " + maxMessageSize + ".");
+    }
+  }
+
+  private boolean isLargeMessage(byte[] message) {
+    return message != null && message.length > maxMessageSize;
+  }
+
+  private List<Entry<byte[], byte[]>> removeLargeMessages(List<Entry<byte[], byte[]>> entries) {
+    List<Entry<byte[], byte[]>> largeMessageSafeEntries = new ArrayList<>();
+    entries.forEach(entry -> {
+        if (!isLargeMessage(entry.getValue())) {
+          largeMessageSafeEntries.add(entry);
+        } else {
+          LOG.info("Ignoring a large message with size " + entry.getValue().length + " since it is greater than "
+              + "the maximum allowed value of " + maxMessageSize);
+          largeMessageSafeStoreMetrics.ignoredLargeMessages().inc();
+        }
+      });
+    return largeMessageSafeEntries;
+  }
+}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index b85ff1a..1978426 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -31,18 +31,19 @@ import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StorePrope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil}
+import org.apache.samza.util.{HighResolutionClock, Logging}
 
 /**
- * A key value storage engine factory implementation
- *
- * This trait encapsulates all the steps needed to create a key value storage engine. It is meant to be extended
- * by the specific key value store factory implementations which will in turn override the getKVStore method.
- */
+  * A key value storage engine factory implementation
+  *
+  * This trait encapsulates all the steps needed to create a key value storage engine. It is meant to be extended
+  * by the specific key value store factory implementations which will in turn override the getKVStore method.
+  */
 trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
 
   private val INMEMORY_KV_STORAGE_ENGINE_FACTORY =
     "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"
+
   /**
    * Return a KeyValueStore instance for the given store name,
    * which will be used as the underlying raw store
@@ -88,6 +89,10 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder()
     val accessLog = storageConfig.getAccessLogEnabled(storeName)
 
+    var maxMessageSize = storageConfig.getChangelogMaxMsgSizeBytes(storeName)
+    val disallowLargeMessages = storageConfig.getDisallowLargeMessages(storeName)
+    val dropLargeMessage = storageConfig.getDropLargeMessages(storeName)
+
     if (storeFactory.isEmpty) {
       throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!")
     }
@@ -123,22 +128,47 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       new LoggedStore(rawStore, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
     }
 
-    // wrap with serialization
-    val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
-    val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
+    var toBeAccessLoggedStore: KeyValueStore[K, V] = null
 
-    // maybe wrap with caching
-    val maybeCachedStore = if (enableCache) {
-      val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
-      new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
-    } else {
-      serialized
+    // If large messages are disallowed in config, then this creates a LargeMessageSafeKeyValueStore that throws a
+    // RecordTooLargeException when a large message is encountered.
+    if (disallowLargeMessages) {
+      // maybe wrap with caching
+      val maybeCachedStore = if (enableCache) {
+        createCachedStore(storeName, registry, maybeLoggedStore, cacheSize, batchSize)
+      } else {
+        maybeLoggedStore
+      }
+
+      // wrap with large message checking
+      val largeMessageSafeKeyValueStore = new LargeMessageSafeStore(maybeCachedStore, storeName, false, maxMessageSize)
+      // wrap with serialization
+      val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
+      toBeAccessLoggedStore = new SerializedKeyValueStore[K, V](largeMessageSafeKeyValueStore, keySerde, msgSerde, serializedMetrics)
+
+    }
+    else {
+      val toBeSerializedStore = if (dropLargeMessage) {
+        // wrap with large message checking
+        new LargeMessageSafeStore(maybeLoggedStore, storeName, dropLargeMessage, maxMessageSize)
+      } else {
+        maybeLoggedStore
+      }
+      // wrap with serialization
+      val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
+      val serializedStore = new SerializedKeyValueStore[K, V](toBeSerializedStore, keySerde, msgSerde, serializedMetrics)
+      // maybe wrap with caching
+      toBeAccessLoggedStore = if (enableCache) {
+        createCachedStore(storeName, registry, serializedStore, cacheSize, batchSize)
+      } else {
+        serializedStore
+      }
     }
 
     val maybeAccessLoggedStore = if (accessLog) {
-      new AccessLoggedStore(maybeCachedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde)
+      new AccessLoggedStore(toBeAccessLoggedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde)
     } else {
-      maybeCachedStore
+      toBeAccessLoggedStore
     }
 
     // wrap with null value checking
@@ -162,4 +192,10 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
   }
 
+  def createCachedStore[K, V](storeName: String, registry: MetricsRegistry,
+    underlyingStore: KeyValueStore[K, V], cacheSize: Int, batchSize: Int): KeyValueStore[K, V] = {
+    // wrap with caching
+    val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
+    new CachedStore(underlyingStore, cacheSize, batchSize, cachedStoreMetrics)
+  }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index fa8b1b2..959e49e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -27,7 +27,8 @@ import java.util.Arrays
  * A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
  * 1. Batch together writes to rocksdb, this turns out to be a great optimization
  * 2. Avoid duplicate writes and duplicate log entries within a commit interval. i.e. if there are two updates to the same key, log only the later.
- * 3. Avoid deserialization cost for gets on very common keys
+ * 3. Avoid deserialization cost for gets on very common keys (unless the keys itself are an array of bytes,
+ * in which case they don't need to be deserialized and there is no performance benefit).
  *
  * This caching does introduce a few odd corner cases :-(
  * 1. Items in the cache have pass-by-reference semantics but items in rocksdb have pass-by-value semantics. Modifying items after a put is a bad idea.
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LargeMessageSafeStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LargeMessageSafeStoreMetrics.scala
new file mode 100644
index 0000000..015f2da
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LargeMessageSafeStoreMetrics.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap}
+
+class LargeMessageSafeStoreMetrics(
+  val storeName: String,
+  val registry: MetricsRegistry) extends MetricsHelper {
+
+  val ignoredLargeMessages = newCounter("ignored-large-messages")
+
+  override def getPrefix = storeName + "-"
+}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index e5f4ca4..fa37b21 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -24,8 +24,8 @@ import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task.MessageCollector
 
 /**
- * A key/value store decorator that adds a changelog for any changes made to the underlying store
- */
+  * A key/value store decorator that adds a changelog for any changes made to the underlying store
+  */
 class LoggedStore[K, V](
   val store: KeyValueStore[K, V],
   val systemStreamPartition: SystemStreamPartition,
@@ -57,8 +57,8 @@ class LoggedStore[K, V](
   }
 
   /**
-   * Perform the local update and log it out to the changelog
-   */
+    * Perform the local update and log it out to the changelog
+    */
   def put(key: K, value: V) {
     metrics.puts.inc
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, value))
@@ -66,8 +66,8 @@ class LoggedStore[K, V](
   }
 
   /**
-   * Perform multiple local updates and log out all changes to the changelog
-   */
+    * Perform multiple local updates and log out all changes to the changelog
+    */
   def putAll(entries: java.util.List[Entry[K, V]]) {
     metrics.puts.inc(entries.size)
     val iter = entries.iterator
@@ -79,8 +79,8 @@ class LoggedStore[K, V](
   }
 
   /**
-   * Perform the local delete and log it out to the changelog
-   */
+    * Perform the local delete and log it out to the changelog
+    */
   def delete(key: K) {
     metrics.deletes.inc
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, null))
@@ -88,8 +88,8 @@ class LoggedStore[K, V](
   }
 
   /**
-   * Perform the local deletes and log them out to the changelog
-   */
+    * Perform the local deletes and log them out to the changelog
+    */
   override def deleteAll(keys: java.util.List[K]) = {
     metrics.deletes.inc(keys.size)
     val keysIterator = keys.iterator
@@ -117,4 +117,4 @@ class LoggedStore[K, V](
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     store.snapshot(from, to)
   }
-}
+}
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/RecordTooLargeException.java b/samza-kv/src/main/scala/org/apache/samza/storage/kv/RecordTooLargeException.java
new file mode 100644
index 0000000..6d3e244
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/RecordTooLargeException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import org.apache.samza.SamzaException;
+
+
+public class RecordTooLargeException extends SamzaException {
+
+  public RecordTooLargeException(String s) {
+    super(s);
+  }
+}
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLargeMessageSafeStore.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLargeMessageSafeStore.java
new file mode 100644
index 0000000..f1a303c
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLargeMessageSafeStore.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.LongSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestLargeMessageSafeStore {
+
+  @Mock
+  KeyValueStore<byte[], byte[]> store;
+  int maxMessageSize = 1024;
+  String storeName = "testStore";
+
+  @Before
+  public void setup() {
+    Mockito.doNothing().when(store).put(Matchers.any(), Matchers.any());
+    Mockito.doNothing().when(store).putAll(Matchers.any());
+  }
+
+  @Test
+  public void testSmallMessagePut() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, false, maxMessageSize);
+
+    byte[] key = new byte[16];
+    byte[] smallMessage = new byte[32];
+    largeMessageSafeKeyValueStore.put(key, smallMessage);
+    Mockito.verify(store).put(Matchers.eq(key), Matchers.eq(smallMessage));
+  }
+
+  @Test
+  public void testLargeMessagePutWithDropLargeMessageDisabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, false, maxMessageSize);
+
+    byte[] key = new byte[16];
+    byte[] largeMessage = new byte[maxMessageSize + 1];
+    try {
+      largeMessageSafeKeyValueStore.put(key, largeMessage);
+      Assert.fail("The test case should have failed due to a large message being passed to the changelog, but it didn't.");
+    } catch (RecordTooLargeException e) {
+      Mockito.verifyZeroInteractions(store);
+    }
+  }
+
+  @Test
+  public void testSmallMessagePutAllSuccessWithDropLargeMessageDisabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, false, maxMessageSize);
+    byte[] key = new byte[16];
+    byte[] smallMessage = new byte[32];
+
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    entries.add(new Entry<>(key, smallMessage));
+
+    largeMessageSafeKeyValueStore.putAll(entries);
+    Mockito.verify(store).putAll(Matchers.eq(entries));
+    Mockito.verify(store, Mockito.never()).put(Matchers.any(byte[].class), Matchers.any(byte[].class));
+  }
+
+  @Test
+  public void testLargeMessagePutAllFailureWithDropLargeMessageDisabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, false, maxMessageSize);
+    byte[] key = new byte[16];
+    byte[] largeMessage = new byte[maxMessageSize + 1];
+
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    entries.add(new Entry<>(key, largeMessage));
+
+    try {
+      largeMessageSafeKeyValueStore.putAll(entries);
+      Assert.fail("The test case should have failed due to a large message being passed to the changelog, but it didn't.");
+    } catch (RecordTooLargeException e) {
+      Mockito.verifyZeroInteractions(store);
+    }
+  }
+
+  @Test
+  public void testSmallMessagePutWithSerdeAndDropLargeMessageDisabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, false, maxMessageSize);
+
+    Serde<Long> longSerde = new LongSerde();
+    long longObj = 1000L;
+    byte[] key = longSerde.toBytes(longObj);
+
+    JsonSerdeV2<Map<String, Object>> jsonSerde = new JsonSerdeV2<>();
+    Map<String, Object> obj = new HashMap<>();
+    obj.put("jack", "jill");
+    obj.put("john", 2);
+    byte[] smallMessage = jsonSerde.toBytes(obj);
+
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    entries.add(new Entry<>(key, smallMessage));
+
+    largeMessageSafeKeyValueStore.putAll(entries);
+    Mockito.verify(store).putAll(Matchers.eq(entries));
+    Mockito.verify(store, Mockito.never()).put(Matchers.any(byte[].class), Matchers.any(byte[].class));
+  }
+
+  @Test
+  public void testLargeMessagePutWithSerdeAndDropLargeMessageDisabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, false, maxMessageSize);
+
+    Serde<Long> longSerde = new LongSerde();
+    long longObj = 1000L;
+    byte[] key = longSerde.toBytes(longObj);
+
+    Serde<String> stringSerde = new StringSerde();
+    String largeString = StringUtils.repeat("a", maxMessageSize + 1);
+    byte[] largeMessage = stringSerde.toBytes(largeString);
+
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    entries.add(new Entry<>(key, largeMessage));
+
+    try {
+      largeMessageSafeKeyValueStore.putAll(entries);
+    } catch (RecordTooLargeException e) {
+      Mockito.verifyZeroInteractions(store);
+    }
+  }
+
+  @Test
+  public void testSmallMessagePutSuccessWithDropLargeMessageEnabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, true, maxMessageSize);
+    byte[] key = new byte[16];
+    byte[] smallMessage = new byte[32];
+
+    largeMessageSafeKeyValueStore.put(key, smallMessage);
+
+    Mockito.verify(store).put(Matchers.eq(key), Matchers.eq(smallMessage));
+  }
+
+  @Test
+  public void testLargeMessagePutSuccessWithDropLargeMessageEnabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, true, maxMessageSize);
+    byte[] key = new byte[16];
+    byte[] largeMessage = new byte[maxMessageSize + 1];
+
+    largeMessageSafeKeyValueStore.put(key, largeMessage);
+
+    Mockito.verifyZeroInteractions(store);
+  }
+
+  @Test
+  public void testPutAllSuccessWithDropLargeMessageEnabled() {
+    LargeMessageSafeStore largeMessageSafeKeyValueStore = new LargeMessageSafeStore(store, storeName, true, maxMessageSize);
+    byte[] key1 = new byte[16];
+    byte[] largeMessage = new byte[maxMessageSize + 1];
+    byte[] key2 = new byte[8];
+    byte[] smallMessage = new byte[1];
+
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    Entry<byte[], byte[]> largeMessageEntry = new Entry<>(key1, largeMessage);
+    Entry<byte[], byte[]> smallMessageEntry = new Entry<>(key2, smallMessage);
+    entries.add(largeMessageEntry);
+    entries.add(smallMessageEntry);
+
+    largeMessageSafeKeyValueStore.putAll(entries);
+
+    entries.remove(largeMessageEntry);
+    Mockito.verify(store).putAll(Matchers.eq(entries));
+    Mockito.verify(store, Mockito.never()).put(Matchers.any(byte[].class), Matchers.any(byte[].class));
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TestLargeMessageSafeKeyValueStores.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TestLargeMessageSafeKeyValueStores.java
new file mode 100644
index 0000000..f6cc400
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TestLargeMessageSafeKeyValueStores.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.WriteOptions;
+import scala.Int;
+
+
+/**
+ * Test suite to check handling of large messages when attempting to write to the store.
+ */
+@RunWith(value = Parameterized.class)
+public class TestLargeMessageSafeKeyValueStores {
+
+  private String typeOfStore;
+  private String storeConfig;
+  private boolean dropLargeMessage;
+
+  private static File dir = new File(System.getProperty("java.io.tmpdir"), "rocksdb-test-" + new Random().nextInt(Int.MaxValue()));
+  private static Serde<String> stringSerde = new StringSerde();
+  private static String storeName = "testStore";
+  private static SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+  private static MetricsRegistry metricsRegistry = new MetricsRegistryMap();
+  private static LoggedStoreMetrics loggedStoreMetrics = new LoggedStoreMetrics(storeName, metricsRegistry);
+  private static KeyValueStoreMetrics keyValueStoreMetrics = new KeyValueStoreMetrics(storeName, metricsRegistry);
+  private static SerializedKeyValueStoreMetrics serializedKeyValueStoreMetrics = new SerializedKeyValueStoreMetrics(storeName, metricsRegistry);
+  private static CachedStoreMetrics cachedStoreMetrics = new CachedStoreMetrics(storeName, metricsRegistry);
+  private static int maxMessageSize = 1024;
+  private static int cacheSize = 1024;
+  private static int batchSize = 1;
+
+  private KeyValueStore<String, String> store = null;
+  private KeyValueStore<byte[], byte[]> loggedStore = null;
+
+  /**
+   * @param typeOfStore Defines type of key-value store (Eg: "rocksdb" / "inmemory")
+   * @param storeConfig Defines what order we are invoking the stores in - serde / cache-then-serde / serde-then-cache
+   * @param dropLargeMessageStr Defines the value of the drop.large.message config which drops large messages if true
+   */
+  public TestLargeMessageSafeKeyValueStores(String typeOfStore, String storeConfig, String dropLargeMessageStr) {
+    this.typeOfStore = typeOfStore;
+    this.storeConfig = storeConfig;
+    this.dropLargeMessage = Boolean.valueOf(dropLargeMessageStr);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<String[]> data() {
+    return Arrays.asList(new String[][] {
+        {"inmemory", "serde", "true"},
+        {"inmemory", "serde", "false"},
+        {"inmemory", "cache-then-serde", "true"},
+        {"inmemory", "cache-then-serde", "false"},
+        {"inmemory", "serde-then-cache", "false"},
+        {"inmemory", "serde-then-cache", "true"},
+        //RocksDB
+        {"rocksdb", "serde", "true"},
+        {"rocksdb", "serde", "false"},
+        {"rocksdb", "cache-then-serde", "true"},
+        {"rocksdb", "cache-then-serde", "false"},
+        {"rocksdb", "serde-then-cache", "false"},
+        {"rocksdb", "serde-then-cache", "true"}
+    });
+  }
+
+  @Before
+  public void setup() {
+
+    KeyValueStore<byte[], byte[]> kvStore;
+    switch (typeOfStore) {
+      case "inmemory" : {
+        kvStore = new InMemoryKeyValueStore(keyValueStoreMetrics);
+        break;
+      }
+      case "rocksdb" : {
+        kvStore = new RocksDbKeyValueStore(dir, new org.rocksdb.Options().setCreateIfMissing(true).setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION),
+            new MapConfig(), false, storeName,
+            new WriteOptions(), new FlushOptions(), keyValueStoreMetrics);
+        break;
+      }
+      default :
+        throw new IllegalArgumentException("Type of store undefined: " + typeOfStore);
+    }
+
+    MessageCollector collector = envelope -> {
+      int messageLength = ((byte[]) envelope.getMessage()).length;
+      if (messageLength > maxMessageSize) {
+        throw new SamzaException("Logged store message size " + messageLength + " for store " + storeName
+            + " was larger than the maximum allowed message size " + maxMessageSize + ".");
+      }
+    };
+    loggedStore = new LoggedStore<>(kvStore, systemStreamPartition, collector, loggedStoreMetrics);
+
+    switch (storeConfig) {
+      case "serde" : {
+        KeyValueStore<byte[], byte[]> largeMessageSafeStore =
+            new LargeMessageSafeStore(loggedStore, storeName, dropLargeMessage, maxMessageSize);
+        store = new SerializedKeyValueStore<>(largeMessageSafeStore, stringSerde, stringSerde,
+            serializedKeyValueStoreMetrics);
+        break;
+      }
+      case "cache-then-serde" : {
+        KeyValueStore<byte[], byte[]> toBeSerializedStore = loggedStore;
+        if (dropLargeMessage) {
+          toBeSerializedStore = new LargeMessageSafeStore(loggedStore, storeName, dropLargeMessage, maxMessageSize);
+        }
+        KeyValueStore<String, String> serializedStore =
+            new SerializedKeyValueStore<>(toBeSerializedStore, stringSerde, stringSerde, serializedKeyValueStoreMetrics);
+        store = new CachedStore<>(serializedStore, cacheSize, batchSize, cachedStoreMetrics);
+        break;
+      }
+      //For this case, the value of dropLargeMessage doesn't matter since we are testing the case when
+      // large messages are expected and StorageConfig.DISALLOW_LARGE_MESSAGES is true.
+      case "serde-then-cache" : {
+        KeyValueStore<byte[], byte[]> cachedStore =
+            new CachedStore<>(loggedStore, cacheSize, batchSize, cachedStoreMetrics);
+        KeyValueStore<byte[], byte[]> largeMessageSafeStore =
+            new LargeMessageSafeStore(cachedStore, storeName, dropLargeMessage, maxMessageSize);
+        store = new SerializedKeyValueStore<>(largeMessageSafeStore, stringSerde, stringSerde,
+            serializedKeyValueStoreMetrics);
+        break;
+      }
+      default :
+        throw new IllegalArgumentException("Store config undefined: " + storeConfig);
+    }
+    store = new NullSafeKeyValueStore<>(store);
+  }
+
+  @After
+  public void teardown() {
+    try {
+      store.close();
+    } catch (SamzaException e) {
+      loggedStore.close();
+    }
+    if (dir != null && dir.listFiles() != null) {
+      for (File file : dir.listFiles())
+        file.delete();
+      dir.delete();
+    }
+  }
+
+  @Test
+  public void testLargeMessagePut() {
+    String key = "test";
+    String largeMessage = StringUtils.repeat("a", maxMessageSize + 1);
+
+    if (dropLargeMessage) {
+      store.put(key, largeMessage);
+      Assert.assertNull("The large message was stored while it shouldn't have been.", loggedStore.get(stringSerde.toBytes(key)));
+    } else {
+      try {
+        store.put(key, largeMessage);
+        Assert.fail("Failure since put() method invocation incorrectly completed.");
+      } catch (SamzaException e) {
+        Assert.assertNull("The large message was stored while it shouldn't have been.", loggedStore.get(stringSerde.toBytes(key)));
+      }
+    }
+  }
+
+  @Test
+  public void testLargeMessagePutAll() {
+    String key = "test";
+    String largeMessage = StringUtils.repeat("a", maxMessageSize + 1);
+    List<Entry<String, String>> entries = new ArrayList<>();
+    entries.add(new Entry<>(key, largeMessage));
+
+    if (dropLargeMessage) {
+      store.putAll(entries);
+      Assert.assertNull("The large message was stored while it shouldn't have been.", loggedStore.get(stringSerde.toBytes(key)));
+    } else {
+      try {
+        store.putAll(entries);
+        Assert.fail("Failure since putAll() method invocation incorrectly completed.");
+      } catch (SamzaException e) {
+        Assert.assertNull("The large message was stored while it shouldn't have been.", loggedStore.get(stringSerde.toBytes(key)));
+      }
+    }
+  }
+
+  @Test
+  public void testSmallMessagePut() {
+    String key = "test";
+    String smallMessage = StringUtils.repeat("a", maxMessageSize - 1);
+
+    store.put(key, smallMessage);
+    Assert.assertEquals(store.get(key), smallMessage);
+  }
+
+  @Test
+  public void testSmallMessagePutAll() {
+    String key = "test";
+    String smallMessage = StringUtils.repeat("a", maxMessageSize - 1);
+
+    List<Entry<String, String>> entries = new ArrayList<>();
+    entries.add(new Entry<>(key, smallMessage));
+
+    store.putAll(entries);
+    Assert.assertEquals(store.get(key), smallMessage);
+  }
+}