You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/28 00:46:46 UTC

[GitHub] [samza] cameronlee314 commented on a change in pull request #1352: SAMZA-2516: Migrate BaseKeyValueStorageEngineFactory to be an abstract class instead of trait

cameronlee314 commented on a change in pull request #1352:
URL: https://github.com/apache/samza/pull/1352#discussion_r431518139



##########
File path: samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param jobContext Information about the job in which the task is executing.
+   * @param containerContext Information about the container in which the task is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
+   * @param containerContext Information about the container in which the task is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", Math.max(batchSize, 1000));
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException(
+          "A store's cache.size cannot be less than batch.size as batched values reside in cache.");
+    }
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value storage.");
+    }
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key value storage.");
+    }
+
+    KeyValueStore<byte[], byte[]> rawStore =
+        getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode);
+    KeyValueStore<byte[], byte[]> maybeLoggedStore = buildMaybeLoggedStore(changelogSSP,
+        storeName, registry, storePropertiesBuilder, rawStore, changelogCollector);
+    // this also applies serialization and caching layers
+    KeyValueStore<K, V> toBeAccessLoggedStore = applyLargeMessageHandling(storeName, registry,
+        maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, msgSerde);
+    KeyValueStore<K, V> maybeAccessLoggedStore =
+        buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig,
+            keySerde);
+    KeyValueStore<K, V> nullSafeStore = new NullSafeKeyValueStore<>(maybeAccessLoggedStore);
+
+    KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry);
+    HighResolutionClock clock = buildClock(jobContext.getConfig());
+    return new KeyValueStorageEngine<>(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
+        changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize,
+        ScalaJavaUtil.toScalaFunction(clock::nanoTime));
+  }
+
+  private static KeyValueStore<byte[], byte[]> buildMaybeLoggedStore(SystemStreamPartition changelogSSP,
+      String storeName,
+      MetricsRegistry registry,
+      StoreProperties.StorePropertiesBuilder storePropertiesBuilder,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      MessageCollector changelogCollector) {
+    if (changelogSSP == null) {
+      return storeToWrap;
+    } else {
+      LoggedStoreMetrics loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry);
+      storePropertiesBuilder.setLoggedStore(true);
+      return new LoggedStore<>(storeToWrap, changelogSSP, changelogCollector, loggedStoreMetrics);
+    }
+  }
+
+  private static <T, U> KeyValueStore<T, U> applyLargeMessageHandling(String storeName,
+      MetricsRegistry registry,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      StorageConfig storageConfig,
+      int cacheSize,
+      int batchSize,
+      Serde<T> keySerde,
+      Serde<U> msgSerde) {
+    int maxMessageSize = storageConfig.getChangelogMaxMsgSizeBytes(storeName);
+    if (storageConfig.getDisallowLargeMessages(storeName)) {
+      /*
+       * If large messages are disallowed in config, then this creates a LargeMessageSafeKeyValueStore that throws a
+       * RecordTooLargeException when a large message is encountered.
+       */
+      KeyValueStore<byte[], byte[]> maybeCachedStore =
+          buildMaybeCachedStore(storeName, registry, storeToWrap, cacheSize, batchSize);
+      LargeMessageSafeStore largeMessageSafeKeyValueStore =
+          new LargeMessageSafeStore(maybeCachedStore, storeName, false, maxMessageSize);
+      return buildSerializedStore(storeName, registry, largeMessageSafeKeyValueStore, keySerde, msgSerde);
+    } else {
+      KeyValueStore<byte[], byte[]> toBeSerializedStore;
+      if (storageConfig.getDropLargeMessages(storeName)) {
+        toBeSerializedStore = new LargeMessageSafeStore(storeToWrap, storeName, true, maxMessageSize);
+      } else {
+        toBeSerializedStore = storeToWrap;
+      }
+      KeyValueStore<T, U> serializedStore =
+          buildSerializedStore(storeName, registry, toBeSerializedStore, keySerde, msgSerde);
+      return buildMaybeCachedStore(storeName, registry, serializedStore, cacheSize, batchSize);

Review comment:
       Unfortunately, there is a difference. The description of https://github.com/apache/samza/pull/1008 has some more context, although it's not fully consistent with the code since it mentions a non-existent "expect.large.message" config.
   I think the idea is that it may be undesirable to cache a large message when it won't be able to be written to the changelog. In order to prevent that, the serialization needs to go before the cached store. However, if it's ok to cache the large message, then that is more performant since the deserialized object can be stored in the cache. So multiple options were provided, but that required these multiple flows.

##########
File path: samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestBaseKeyValueStorageEngineFactory {
+  private static final String STORE_NAME = "myStore";
+  private static final StorageEngineFactory.StoreMode STORE_MODE = StorageEngineFactory.StoreMode.ReadWrite;
+  private static final SystemStreamPartition CHANGELOG_SSP =
+      new SystemStreamPartition("system", "stream", new Partition(0));
+  private static final Map<String, String> BASE_CONFIG =
+      ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME),
+          MockKeyValueStorageEngineFactory.class.getName());
+  private static final Map<String, String> DISABLE_CACHE =
+      ImmutableMap.of(String.format("stores.%s.object.cache.size", STORE_NAME), "0");
+  private static final Map<String, String> DISALLOW_LARGE_MESSAGES =
+      ImmutableMap.of(String.format(StorageConfig.DISALLOW_LARGE_MESSAGES, STORE_NAME), "true");
+  private static final Map<String, String> DROP_LARGE_MESSAGES =
+      ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME), "true");
+  private static final Map<String, String> ACCESS_LOG_ENABLED =
+      ImmutableMap.of(String.format("stores.%s.accesslog.enabled", STORE_NAME), "true");
+
+  @Mock
+  private File storeDir;
+  @Mock
+  private Serde<String> keySerde;
+  @Mock
+  private Serde<String> msgSerde;
+  @Mock
+  private MessageCollector changelogCollector;
+  @Mock
+  private MetricsRegistry metricsRegistry;
+  @Mock
+  private JobContext jobContext;
+  @Mock
+  private ContainerContext containerContext;
+  @Mock
+  private KeyValueStore<byte[], byte[]> rawKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    // some metrics objects need this for histogram metric instantiation
+    when(this.metricsRegistry.newGauge(any(), any())).thenReturn(mock(Gauge.class));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testMissingStoreFactory() {
+    Config config = new MapConfig();
+    callGetStorageEngine(config, null);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInvalidCacheSize() {
+    Config config = new MapConfig(BASE_CONFIG,
+        ImmutableMap.of(String.format("stores.%s.write.cache.batch", STORE_NAME), "100",
+            String.format("stores.%s.object.cache.size", STORE_NAME), "50"));
+    callGetStorageEngine(config, null);
+  }
+
+  @Test
+  public void testInMemoryKeyValueStore() {
+    Config config = new MapConfig(DISABLE_CACHE, ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME),
+        "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"));
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), false, false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
+    // config has the in-memory key-value factory, but still calling the test factory, so store will be the test store
+    assertEquals(this.rawKeyValueStore, serializedKeyValueStore.getStore());
+  }
+
+  @Test
+  public void testRawStoreOnly() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE);
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
+    assertEquals(this.rawKeyValueStore, serializedKeyValueStore.getStore());
+  }
+
+  @Test
+  public void testWithLoggedStore() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE);
+    StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, true);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
+    LoggedStore<?, ?> loggedStore = assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
+    // noinspection AssertEqualsBetweenInconvertibleTypes

Review comment:
       This is a comment for suppressing the warning on the next line. It is because the type of the first object is `KeyValueStore<byte[], byte[]>` and the type of the second is `KeyValueStore<?, ?>`. The type generics don't matter in these tests since we are just doing top-level type checks and object equality.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org