You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:46:25 UTC

[GitHub] [samza] rmatharu commented on a change in pull request #1352: SAMZA-2516: Migrate BaseKeyValueStorageEngineFactory to be an abstract class instead of trait

rmatharu commented on a change in pull request #1352:
URL: https://github.com/apache/samza/pull/1352#discussion_r430593088



##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");

Review comment:
       Would it help to add the storeName in the error-message, so the user known which store he has not defined the factory for?

##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", Math.max(batchSize, 1000));

Review comment:
       Should we have variables for DEFAULT_BATCH_SIZE and DEFAULT_CACHE_SIZE?

##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", Math.max(batchSize, 1000));
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException(
+          "A store's cache.size cannot be less than batch.size as batched values reside in cache.");
+    }
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value storage.");
+    }
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key value storage.");

Review comment:
       Same as above, might help to have storeName in the error message

##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", Math.max(batchSize, 1000));
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException(
+          "A store's cache.size cannot be less than batch.size as batched values reside in cache.");
+    }
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value storage.");
+    }
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key value storage.");
+    }
+
+    KeyValueStore<byte[], byte[]> rawStore =
+        getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode);
+    KeyValueStore<byte[], byte[]> maybeLoggedStore = buildMaybeLoggedStore(changelogSSP,
+        storeName, registry, storePropertiesBuilder, rawStore, changelogCollector);
+    // this also applies serialization and caching layers
+    KeyValueStore<K, V> toBeAccessLoggedStore = applyLargeMessageHandling(storeName, registry,
+        maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, msgSerde);
+    KeyValueStore<K, V> maybeAccessLoggedStore =
+        buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig,
+            keySerde);
+    KeyValueStore<K, V> nullSafeStore = new NullSafeKeyValueStore<>(maybeAccessLoggedStore);
+
+    KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry);
+    HighResolutionClock clock = buildClock(jobContext.getConfig());
+    return new KeyValueStorageEngine<>(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
+        changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize,
+        ScalaJavaUtil.toScalaFunction(clock::nanoTime));
+  }
+
+  private static KeyValueStore<byte[], byte[]> buildMaybeLoggedStore(SystemStreamPartition changelogSSP,

Review comment:
       /**
   Wraps given store into a LoggedStore if a changeLog for the store is defined, otherwise skips wrapping.
   */

##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", Math.max(batchSize, 1000));
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException(
+          "A store's cache.size cannot be less than batch.size as batched values reside in cache.");
+    }
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value storage.");
+    }
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key value storage.");
+    }
+
+    KeyValueStore<byte[], byte[]> rawStore =
+        getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode);
+    KeyValueStore<byte[], byte[]> maybeLoggedStore = buildMaybeLoggedStore(changelogSSP,
+        storeName, registry, storePropertiesBuilder, rawStore, changelogCollector);
+    // this also applies serialization and caching layers
+    KeyValueStore<K, V> toBeAccessLoggedStore = applyLargeMessageHandling(storeName, registry,
+        maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, msgSerde);
+    KeyValueStore<K, V> maybeAccessLoggedStore =
+        buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig,
+            keySerde);
+    KeyValueStore<K, V> nullSafeStore = new NullSafeKeyValueStore<>(maybeAccessLoggedStore);
+
+    KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry);
+    HighResolutionClock clock = buildClock(jobContext.getConfig());
+    return new KeyValueStorageEngine<>(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
+        changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize,
+        ScalaJavaUtil.toScalaFunction(clock::nanoTime));
+  }
+
+  private static KeyValueStore<byte[], byte[]> buildMaybeLoggedStore(SystemStreamPartition changelogSSP,
+      String storeName,
+      MetricsRegistry registry,
+      StoreProperties.StorePropertiesBuilder storePropertiesBuilder,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      MessageCollector changelogCollector) {
+    if (changelogSSP == null) {
+      return storeToWrap;
+    } else {
+      LoggedStoreMetrics loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry);
+      storePropertiesBuilder.setLoggedStore(true);
+      return new LoggedStore<>(storeToWrap, changelogSSP, changelogCollector, loggedStoreMetrics);
+    }
+  }
+
+  private static <T, U> KeyValueStore<T, U> applyLargeMessageHandling(String storeName,

Review comment:
       nit: buildStoreWithLargeMessageHandling?

##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", Math.max(batchSize, 1000));
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException(
+          "A store's cache.size cannot be less than batch.size as batched values reside in cache.");
+    }
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value storage.");
+    }
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key value storage.");
+    }
+
+    KeyValueStore<byte[], byte[]> rawStore =
+        getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode);
+    KeyValueStore<byte[], byte[]> maybeLoggedStore = buildMaybeLoggedStore(changelogSSP,
+        storeName, registry, storePropertiesBuilder, rawStore, changelogCollector);
+    // this also applies serialization and caching layers
+    KeyValueStore<K, V> toBeAccessLoggedStore = applyLargeMessageHandling(storeName, registry,
+        maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, msgSerde);
+    KeyValueStore<K, V> maybeAccessLoggedStore =
+        buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig,
+            keySerde);
+    KeyValueStore<K, V> nullSafeStore = new NullSafeKeyValueStore<>(maybeAccessLoggedStore);
+
+    KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry);
+    HighResolutionClock clock = buildClock(jobContext.getConfig());
+    return new KeyValueStorageEngine<>(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
+        changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize,
+        ScalaJavaUtil.toScalaFunction(clock::nanoTime));
+  }
+
+  private static KeyValueStore<byte[], byte[]> buildMaybeLoggedStore(SystemStreamPartition changelogSSP,
+      String storeName,
+      MetricsRegistry registry,
+      StoreProperties.StorePropertiesBuilder storePropertiesBuilder,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      MessageCollector changelogCollector) {
+    if (changelogSSP == null) {
+      return storeToWrap;
+    } else {
+      LoggedStoreMetrics loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry);
+      storePropertiesBuilder.setLoggedStore(true);
+      return new LoggedStore<>(storeToWrap, changelogSSP, changelogCollector, loggedStoreMetrics);
+    }
+  }
+
+  private static <T, U> KeyValueStore<T, U> applyLargeMessageHandling(String storeName,
+      MetricsRegistry registry,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      StorageConfig storageConfig,
+      int cacheSize,
+      int batchSize,
+      Serde<T> keySerde,
+      Serde<U> msgSerde) {
+    int maxMessageSize = storageConfig.getChangelogMaxMsgSizeBytes(storeName);
+    if (storageConfig.getDisallowLargeMessages(storeName)) {
+      /*
+       * If large messages are disallowed in config, then this creates a LargeMessageSafeKeyValueStore that throws a
+       * RecordTooLargeException when a large message is encountered.
+       */
+      KeyValueStore<byte[], byte[]> maybeCachedStore =
+          buildMaybeCachedStore(storeName, registry, storeToWrap, cacheSize, batchSize);
+      LargeMessageSafeStore largeMessageSafeKeyValueStore =
+          new LargeMessageSafeStore(maybeCachedStore, storeName, false, maxMessageSize);
+      return buildSerializedStore(storeName, registry, largeMessageSafeKeyValueStore, keySerde, msgSerde);
+    } else {
+      KeyValueStore<byte[], byte[]> toBeSerializedStore;
+      if (storageConfig.getDropLargeMessages(storeName)) {
+        toBeSerializedStore = new LargeMessageSafeStore(storeToWrap, storeName, true, maxMessageSize);
+      } else {
+        toBeSerializedStore = storeToWrap;
+      }
+      KeyValueStore<T, U> serializedStore =
+          buildSerializedStore(storeName, registry, toBeSerializedStore, keySerde, msgSerde);
+      return buildMaybeCachedStore(storeName, registry, serializedStore, cacheSize, batchSize);

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org