You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:43 UTC
[03/12] samza git commit: Consolidating package names for System,
Stream, Application and Table descriptors.
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java
new file mode 100644
index 0000000..0b48e6b
--- /dev/null
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestInMemoryTableProvider {
+ @Test
+ public void testGenerateConfig() {
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ tableSpecConfig.put("inmemory.c1", "c1-value");
+ tableSpecConfig.put("inmemory.c2", "c2-value");
+ tableSpecConfig.put("c3", "c3-value");
+ tableSpecConfig.put("c4", "c4-value");
+
+ TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
+ "my-table-provider-factory", tableSpecConfig);
+
+ Map<String, String> generatedConfig = new HashMap<>();
+ generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+ generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+ TableProvider tableProvider = new InMemoryTableProvider(tableSpec);
+ Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
+
+ Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+ Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+ Assert.assertEquals(
+ InMemoryKeyValueStorageEngineFactory.class.getName(),
+ tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+ Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+ Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+ Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+ Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
deleted file mode 100644
index 50bc2c2..0000000
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Table descriptor for RocksDb backed tables
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> {
-
- static final public String WRITE_BATCH_SIZE = "write.batch.size";
- static final public String OBJECT_CACHE_SIZE = "object.cache.size";
- static final public String CONTAINER_CACHE_SIZE_BYTES = "container.cache.size.bytes";
- static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = "container.write.buffer.size.bytes";
- static final public String ROCKSDB_COMPRESSION = "rocksdb.compression";
- static final public String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes";
- static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms";
- static final public String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style";
- static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
- static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
- static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
-
- private Integer writeBatchSize;
- private Integer objectCacheSize;
- private Integer cacheSize;
- private Integer writeBufferSize;
- private Integer blockSize;
- private Integer ttl;
- private Integer numWriteBuffers;
- private Integer maxLogFileSize;
- private Integer numLogFilesToKeep;
- private String compressionType;
- private String compactionStyle;
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- */
- public RocksDbTableDescriptor(String tableId) {
- super(tableId);
- }
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
- super(tableId, serde);
- }
-
- /**
- * For better write performance, the storage engine buffers writes and applies them to the
- * underlying store in a batch. If the same key is written multiple times in quick succession,
- * this buffer also deduplicates writes to the same key. This property is set to the number
- * of key/value pairs that should be kept in this in-memory buffer, per task instance.
- * The number cannot be greater than {@link #withObjectCacheSize}.
- * <p>
- * Default value is 500.
- * <p>
- * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
- *
- * @param writeBatchSize write batch size
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) {
- this.writeBatchSize = writeBatchSize;
- return this;
- }
-
- /**
- * Samza maintains an additional cache in front of RocksDB for frequently-accessed objects.
- * This cache contains deserialized objects (avoiding the deserialization overhead on cache
- * hits), in contrast to the RocksDB block cache ({@link #withCacheSize}), which caches
- * serialized objects. This property determines the number of objects to keep in Samza's
- * cache, per task instance. This same cache is also used for write buffering
- * (see {@link #withWriteBatchSize}). A value of 0 disables all caching and batching.
- * <p>
- * Default value is 1,000.
- * <p>
- * Refer to <code>stores.store-name.object.cache.size</code> in Samza configuration guide
- *
- * @param objectCacheSize the object cache size
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) {
- this.objectCacheSize = objectCacheSize;
- return this;
- }
-
- /**
- * The size of RocksDB's block cache in bytes, per container. If there are several task
- * instances within one container, each is given a proportional share of this cache.
- * Note that this is an off-heap memory allocation, so the container's total memory
- * use is the maximum JVM heap size plus the size of this cache.
- * <p>
- * Default value is 104,857,600.
- * <p>
- * Refer to <code>stores.store-name.container.cache.size.bytes</code> in Samza configuration guide
- *
- * @param cacheSize the cache size in bytes
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) {
- this.cacheSize = cacheSize;
- return this;
- }
-
- /**
- * The amount of memory (in bytes) that RocksDB uses for buffering writes before they are
- * written to disk, per container. If there are several task instances within one container,
- * each is given a proportional share of this buffer. This setting also determines the
- * size of RocksDB's segment files.
- * <p>
- * Default value is 33,554,432.
- * <p>
- * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> in Samza configuration guide
- *
- * @param writeBufferSize the write buffer size in bytes
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) {
- this.writeBufferSize = writeBufferSize;
- return this;
- }
-
- /**
- * Controls whether RocksDB should compress data on disk and in the block cache.
- * The following values are valid:
- * <ul>
- * <li><b>snappy</b> Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.
- * <li><b>bzip2</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Bzip2">bzip2</a> codec.
- * <li><b>zlib</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Zlib">zlib</a> codec.
- * <li><b>lz4</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4</a> codec.
- * <li><b>lz4hc</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4hc</a> (high compression) codec.
- * <li><b>none</b> Do not compress data.
- * </ul>
- * <p>
- * Default value is snappy.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza configuration guide
- *
- * @param compressionType the compression type
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withCompressionType(String compressionType) {
- this.compressionType = compressionType;
- return this;
- }
-
- /**
- * If compression is enabled, RocksDB groups approximately this many uncompressed
- * bytes into one compressed block. You probably don't need to change this property.
- * <p>
- * Default value is 4,096.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza configuration guide
- *
- * @param blockSize the block size in bytes
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) {
- this.blockSize = blockSize;
- return this;
- }
-
- /**
- * The time-to-live of the store. Please note it's not a strict TTL limit (removed
- * only after compaction). Please use caution opening a database with and without
- * TTL, as it might corrupt the database. Please make sure to read the
- * <a href="https://github.com/facebook/rocksdb/wiki/Time-to-Live">constraints</a>
- * before using.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza configuration guide
- *
- * @param ttl the time to live in milliseconds
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withTtl(int ttl) {
- this.ttl = ttl;
- return this;
- }
-
- /**
- * This property controls the compaction style that RocksDB will employ when compacting
- * its levels. The following values are valid:
- * <ul>
- * <li><b>universal</b> Use <a href="https://github.com/facebook/rocksdb/wiki/Universal-Compaction">universal</a> compaction.
- * <li><b>fifo</b> Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.
- * <li><b>level</b> Use RocksDB's standard leveled compaction.
- * </ul>
- * <p>
- * Default value is universal.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza configuration guide
- *
- * @param compactionStyle the compaction style
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withCompactionStyle(String compactionStyle) {
- this.compactionStyle = compactionStyle;
- return this;
- }
-
- /**
- * Configures the
- * <a href="https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer">
- * number of write buffers</a> that a RocksDB store uses. This allows RocksDB
- * to continue taking writes to other buffers even while a given write buffer is being
- * flushed to disk.
- * <p>
- * Default value is 3.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide
- *
- * @param numWriteBuffers the number of write buffers
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) {
- this.numWriteBuffers = numWriteBuffers;
- return this;
- }
-
- /**
- * The maximum size in bytes of the RocksDB LOG file before it is rotated.
- * <p>
- * Default value is 67,108,864.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> in Samza configuration guide
- *
- * @param maxLogFileSize the maximal log file size in bytes
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) {
- this.maxLogFileSize = maxLogFileSize;
- return this;
- }
-
- /**
- * The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
- * <p>
- * Default value is 2.
- * <p>
- * Refer to <code>stores.store-name.rocksdb.keep.log.file.num</code> in Samza configuration guide
- *
- * @param numLogFilesToKeep the number of log files to keep
- * @return this table descriptor instance
- */
- public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int numLogFilesToKeep) {
- this.numLogFilesToKeep = numLogFilesToKeep;
- return this;
- }
-
- /**
- * Create a table spec based on this table description
- * @return the table spec
- */
- @Override
- public TableSpec getTableSpec() {
-
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig,
- sideInputs, sideInputsProcessor);
- }
-
- @Override
- protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
-
- super.generateTableSpecConfig(tableSpecConfig);
-
- if (writeBatchSize != null) {
- addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, writeBatchSize.toString());
- }
- if (objectCacheSize != null) {
- addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, objectCacheSize.toString());
- }
- if (cacheSize != null) {
- addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, cacheSize.toString());
- }
- if (writeBufferSize != null) {
- addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, writeBufferSize.toString());
- }
- if (compressionType != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType);
- }
- if (blockSize != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, blockSize.toString());
- }
- if (ttl != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString());
- }
- if (compactionStyle != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, compactionStyle);
- }
- if (numWriteBuffers != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, numWriteBuffers.toString());
- }
- if (maxLogFileSize != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, maxLogFileSize.toString());
- }
- if (numLogFilesToKeep != null) {
- addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString());
- }
- }
-
- private void addRocksDbConfig(Map<String, String> map, String key, String value) {
- map.put("rocksdb." + key, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
deleted file mode 100644
index df60a5a..0000000
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.config.ClusterManagerConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Table provider for tables backed by RocksDb.
- */
-public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
-
- public RocksDbTableProvider(TableSpec tableSpec) {
- super(tableSpec);
- }
-
- @Override
- public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
-
- Map<String, String> tableConfig = new HashMap<>();
-
- // Store factory configuration
- tableConfig.put(String.format(
- StorageConfig.FACTORY(), tableSpec.getId()),
- RocksDbKeyValueStorageEngineFactory.class.getName());
-
- // Common store configuration
- tableConfig.putAll(generateCommonStoreConfig(jobConfig, generatedConfig));
-
- // Rest of the configuration
- tableSpec.getConfig().entrySet().stream()
- .filter(e -> !e.getKey().startsWith("internal."))
- .forEach(e -> {
- String k = e.getKey();
- String v = e.getValue();
- String realKey = k.startsWith("rocksdb.")
- ? String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length())
- : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
- tableConfig.put(realKey, v);
- });
-
- // Enable host affinity
- tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true");
-
- logger.info("Generated configuration for table " + tableSpec.getId());
-
- return tableConfig;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
deleted file mode 100644
index dbe0f97..0000000
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-
-public class RocksDbTableProviderFactory implements TableProviderFactory {
- @Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- return new RocksDbTableProvider(tableSpec);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
new file mode 100644
index 0000000..a8ede78
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table descriptor for RocksDb backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> {
+
+ static final public String WRITE_BATCH_SIZE = "write.batch.size";
+ static final public String OBJECT_CACHE_SIZE = "object.cache.size";
+ static final public String CONTAINER_CACHE_SIZE_BYTES = "container.cache.size.bytes";
+ static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = "container.write.buffer.size.bytes";
+ static final public String ROCKSDB_COMPRESSION = "rocksdb.compression";
+ static final public String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes";
+ static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms";
+ static final public String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style";
+ static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
+ static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
+ static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
+
+ private Integer writeBatchSize;
+ private Integer objectCacheSize;
+ private Integer cacheSize;
+ private Integer writeBufferSize;
+ private Integer blockSize;
+ private Integer ttl;
+ private Integer numWriteBuffers;
+ private Integer maxLogFileSize;
+ private Integer numLogFilesToKeep;
+ private String compressionType;
+ private String compactionStyle;
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ */
+ public RocksDbTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ * @param serde the serde for key and value
+ */
+ public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
+ /**
+ * For better write performance, the storage engine buffers writes and applies them to the
+ * underlying store in a batch. If the same key is written multiple times in quick succession,
+ * this buffer also deduplicates writes to the same key. This property is set to the number
+ * of key/value pairs that should be kept in this in-memory buffer, per task instance.
+ * The number cannot be greater than {@link #withObjectCacheSize}.
+ * <p>
+ * Default value is 500.
+ * <p>
+ * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
+ *
+ * @param writeBatchSize write batch size
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) {
+ this.writeBatchSize = writeBatchSize;
+ return this;
+ }
+
+ /**
+ * Samza maintains an additional cache in front of RocksDB for frequently-accessed objects.
+ * This cache contains deserialized objects (avoiding the deserialization overhead on cache
+ * hits), in contrast to the RocksDB block cache ({@link #withCacheSize}), which caches
+ * serialized objects. This property determines the number of objects to keep in Samza's
+ * cache, per task instance. This same cache is also used for write buffering
+ * (see {@link #withWriteBatchSize}). A value of 0 disables all caching and batching.
+ * <p>
+ * Default value is 1,000.
+ * <p>
+ * Refer to <code>stores.store-name.object.cache.size</code> in Samza configuration guide
+ *
+ * @param objectCacheSize the object cache size
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) {
+ this.objectCacheSize = objectCacheSize;
+ return this;
+ }
+
+ /**
+ * The size of RocksDB's block cache in bytes, per container. If there are several task
+ * instances within one container, each is given a proportional share of this cache.
+ * Note that this is an off-heap memory allocation, so the container's total memory
+ * use is the maximum JVM heap size plus the size of this cache.
+ * <p>
+ * Default value is 104,857,600.
+ * <p>
+ * Refer to <code>stores.store-name.container.cache.size.bytes</code> in Samza configuration guide
+ *
+ * @param cacheSize the cache size in bytes
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ /**
+ * The amount of memory (in bytes) that RocksDB uses for buffering writes before they are
+ * written to disk, per container. If there are several task instances within one container,
+ * each is given a proportional share of this buffer. This setting also determines the
+ * size of RocksDB's segment files.
+ * <p>
+ * Default value is 33,554,432.
+ * <p>
+ * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> in Samza configuration guide
+ *
+ * @param writeBufferSize the write buffer size in bytes
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ return this;
+ }
+
+ /**
+ * Controls whether RocksDB should compress data on disk and in the block cache.
+ * The following values are valid:
+ * <ul>
+ * <li><b>snappy</b> Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.
+ * <li><b>bzip2</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Bzip2">bzip2</a> codec.
+ * <li><b>zlib</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Zlib">zlib</a> codec.
+ * <li><b>lz4</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4</a> codec.
+ * <li><b>lz4hc</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4hc</a> (high compression) codec.
+ * <li><b>none</b> Do not compress data.
+ * </ul>
+ * <p>
+ * Default value is snappy.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza configuration guide
+ *
+ * @param compressionType the compression type
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withCompressionType(String compressionType) {
+ this.compressionType = compressionType;
+ return this;
+ }
+
+ /**
+ * If compression is enabled, RocksDB groups approximately this many uncompressed
+ * bytes into one compressed block. You probably don't need to change this property.
+ * <p>
+ * Default value is 4,096.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza configuration guide
+ *
+ * @param blockSize the block size in bytes
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) {
+ this.blockSize = blockSize;
+ return this;
+ }
+
+ /**
+ * The time-to-live of the store. Please note it's not a strict TTL limit (removed
+ * only after compaction). Please use caution opening a database with and without
+ * TTL, as it might corrupt the database. Please make sure to read the
+ * <a href="https://github.com/facebook/rocksdb/wiki/Time-to-Live">constraints</a>
+ * before using.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza configuration guide
+ *
+ * @param ttl the time to live in milliseconds
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withTtl(int ttl) {
+ this.ttl = ttl;
+ return this;
+ }
+
+ /**
+ * This property controls the compaction style that RocksDB will employ when compacting
+ * its levels. The following values are valid:
+ * <ul>
+ * <li><b>universal</b> Use <a href="https://github.com/facebook/rocksdb/wiki/Universal-Compaction">universal</a> compaction.
+ * <li><b>fifo</b> Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.
+ * <li><b>level</b> Use RocksDB's standard leveled compaction.
+ * </ul>
+ * <p>
+ * Default value is universal.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza configuration guide
+ *
+ * @param compactionStyle the compaction style
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withCompactionStyle(String compactionStyle) {
+ this.compactionStyle = compactionStyle;
+ return this;
+ }
+
+ /**
+ * Configures the
+ * <a href="https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer">
+ * number of write buffers</a> that a RocksDB store uses. This allows RocksDB
+ * to continue taking writes to other buffers even while a given write buffer is being
+ * flushed to disk.
+ * <p>
+ * Default value is 3.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide
+ *
+ * @param numWriteBuffers the number of write buffers
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) {
+ this.numWriteBuffers = numWriteBuffers;
+ return this;
+ }
+
+ /**
+ * The maximum size in bytes of the RocksDB LOG file before it is rotated.
+ * <p>
+ * Default value is 67,108,864.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> in Samza configuration guide
+ *
+ * @param maxLogFileSize the maximal log file size in bytes
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) {
+ this.maxLogFileSize = maxLogFileSize;
+ return this;
+ }
+
+ /**
+ * The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
+ * <p>
+ * Default value is 2.
+ * <p>
+ * Refer to <code>stores.store-name.rocksdb.keep.log.file.num</code> in Samza configuration guide
+ *
+ * @param numLogFilesToKeep the number of log files to keep
+ * @return this table descriptor instance
+ */
+ public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int numLogFilesToKeep) {
+ this.numLogFilesToKeep = numLogFilesToKeep;
+ return this;
+ }
+
+ /**
+ * Create a table spec based on this table description
+ * @return the table spec
+ */
+ @Override
+ public TableSpec getTableSpec() {
+
+ validate();
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+
+ return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig,
+ sideInputs, sideInputsProcessor);
+ }
+
+ @Override
+ protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+
+ super.generateTableSpecConfig(tableSpecConfig);
+
+ if (writeBatchSize != null) {
+ addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, writeBatchSize.toString());
+ }
+ if (objectCacheSize != null) {
+ addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, objectCacheSize.toString());
+ }
+ if (cacheSize != null) {
+ addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, cacheSize.toString());
+ }
+ if (writeBufferSize != null) {
+ addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, writeBufferSize.toString());
+ }
+ if (compressionType != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType);
+ }
+ if (blockSize != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, blockSize.toString());
+ }
+ if (ttl != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString());
+ }
+ if (compactionStyle != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, compactionStyle);
+ }
+ if (numWriteBuffers != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, numWriteBuffers.toString());
+ }
+ if (maxLogFileSize != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, maxLogFileSize.toString());
+ }
+ if (numLogFilesToKeep != null) {
+ addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString());
+ }
+ }
+
+ private void addRocksDbConfig(Map<String, String> map, String key, String value) {
+ map.put("rocksdb." + key, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java
new file mode 100644
index 0000000..e0c3355
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table provider for tables backed by RocksDb.
+ */
+public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
+
+ public RocksDbTableProvider(TableSpec tableSpec) {
+ super(tableSpec);
+ }
+
+ @Override
+ public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+
+ Map<String, String> tableConfig = new HashMap<>();
+
+ // Store factory configuration
+ tableConfig.put(String.format(
+ StorageConfig.FACTORY(), tableSpec.getId()),
+ RocksDbKeyValueStorageEngineFactory.class.getName());
+
+ // Common store configuration
+ tableConfig.putAll(generateCommonStoreConfig(jobConfig, generatedConfig));
+
+ // Rest of the configuration
+ tableSpec.getConfig().entrySet().stream()
+ .filter(e -> !e.getKey().startsWith("internal."))
+ .forEach(e -> {
+ String k = e.getKey();
+ String v = e.getValue();
+ String realKey = k.startsWith("rocksdb.")
+ ? String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length())
+ : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+ tableConfig.put(realKey, v);
+ });
+
+ // Enable host affinity
+ tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true");
+
+ logger.info("Generated configuration for table " + tableSpec.getId());
+
+ return tableConfig;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java
new file mode 100644
index 0000000..74e74db
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+
+public class RocksDbTableProviderFactory implements TableProviderFactory {
+ @Override
+ public TableProvider getTableProvider(TableSpec tableSpec) {
+ return new RocksDbTableProvider(tableSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
deleted file mode 100644
index cd7e85c..0000000
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import junit.framework.Assert;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-
-public class TestRocksDbTableDescriptor {
-
- @Test
- public void testMinimal() {
- new RocksDbTableDescriptor<Integer, String>("1")
- .validate();
- }
-
- @Test
- public void testSerde() {
- TableSpec tableSpec = new RocksDbTableDescriptor("1",
- KVSerde.of(new IntegerSerde(), new StringSerde()))
- .getTableSpec();
- Assert.assertNotNull(tableSpec.getSerde());
- Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
- Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class);
- }
-
- @Test
- public void testTableSpec() {
-
- TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
- KVSerde.of(new IntegerSerde(), new StringSerde()))
- .withBlockSize(1)
- .withCacheSize(2)
- .withCompactionStyle("fifo")
- .withCompressionType("snappy")
- .withMaxLogFileSize(3)
- .withNumLogFilesToKeep(4)
- .withNumWriteBuffers(5)
- .withObjectCacheSize(6)
- .withTtl(7)
- .withWriteBatchSize(8)
- .withWriteBufferSize(9)
- .withConfig("rocksdb.abc", "xyz")
- .getTableSpec();
-
- Assert.assertNotNull(tableSpec.getSerde());
- Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
- Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
- Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
- Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
- Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
- Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
- Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
- Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
- Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS));
- Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE));
- Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
- Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
- Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
- Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
- Assert.assertEquals("false", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
- }
-
- @Test
- public void testTableSpecWithChangelogEnabled() {
-
- TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
- .withChangelogStream("changelog-$tream")
- .withChangelogReplicationFactor(10)
- .getTableSpec();
-
- Assert.assertEquals("10", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR));
- Assert.assertEquals("changelog-$tream", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM));
- Assert.assertEquals("true", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
- }
-
- private String getConfig(TableSpec tableSpec, String key) {
- return tableSpec.getConfig().get("rocksdb." + key);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
deleted file mode 100644
index 8ce061c..0000000
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-
-public class TestRocksDbTableProvider {
- @Test
- public void testGenerateConfig() {
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- tableSpecConfig.put("rocksdb.c1", "c1-value");
- tableSpecConfig.put("rocksdb.c2", "c2-value");
- tableSpecConfig.put("c3", "c3-value");
- tableSpecConfig.put("c4", "c4-value");
-
- TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
- "my-table-provider-factory", tableSpecConfig);
-
- Map<String, String> generatedConfig = new HashMap<>();
- generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
- generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
-
- TableProvider tableProvider = new RocksDbTableProvider(tableSpec);
- Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
-
- Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
- Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
- Assert.assertEquals(
- RocksDbKeyValueStorageEngineFactory.class.getName(),
- tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
- Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
- Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
- Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
- Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
new file mode 100644
index 0000000..86efea5
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import junit.framework.Assert;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+
+public class TestRocksDbTableDescriptor {
+
+ @Test
+ public void testMinimal() {
+ new RocksDbTableDescriptor<Integer, String>("1")
+ .validate();
+ }
+
+ @Test
+ public void testSerde() {
+ TableSpec tableSpec = new RocksDbTableDescriptor("1",
+ KVSerde.of(new IntegerSerde(), new StringSerde()))
+ .getTableSpec();
+ Assert.assertNotNull(tableSpec.getSerde());
+ Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
+ Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class);
+ }
+
+ @Test
+ public void testTableSpec() {
+
+ TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
+ KVSerde.of(new IntegerSerde(), new StringSerde()))
+ .withBlockSize(1)
+ .withCacheSize(2)
+ .withCompactionStyle("fifo")
+ .withCompressionType("snappy")
+ .withMaxLogFileSize(3)
+ .withNumLogFilesToKeep(4)
+ .withNumWriteBuffers(5)
+ .withObjectCacheSize(6)
+ .withTtl(7)
+ .withWriteBatchSize(8)
+ .withWriteBufferSize(9)
+ .withConfig("rocksdb.abc", "xyz")
+ .getTableSpec();
+
+ Assert.assertNotNull(tableSpec.getSerde());
+ Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
+ Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
+ Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
+ Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
+ Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
+ Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
+ Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
+ Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
+ Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS));
+ Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE));
+ Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
+ Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
+ Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
+ Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
+ Assert.assertEquals("false", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
+ }
+
+ @Test
+ public void testTableSpecWithChangelogEnabled() {
+
+ TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
+ .withChangelogStream("changelog-$tream")
+ .withChangelogReplicationFactor(10)
+ .getTableSpec();
+
+ Assert.assertEquals("10", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR));
+ Assert.assertEquals("changelog-$tream", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM));
+ Assert.assertEquals("true", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
+ }
+
+ private String getConfig(TableSpec tableSpec, String key) {
+ return tableSpec.getConfig().get("rocksdb." + key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java
new file mode 100644
index 0000000..5e4601d
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestRocksDbTableProvider {
+ @Test
+ public void testGenerateConfig() {
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ tableSpecConfig.put("rocksdb.c1", "c1-value");
+ tableSpecConfig.put("rocksdb.c2", "c2-value");
+ tableSpecConfig.put("c3", "c3-value");
+ tableSpecConfig.put("c4", "c4-value");
+
+ TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
+ "my-table-provider-factory", tableSpecConfig);
+
+ Map<String, String> generatedConfig = new HashMap<>();
+ generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+ generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+ TableProvider tableProvider = new RocksDbTableProvider(tableSpec);
+ Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
+
+ Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+ Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+ Assert.assertEquals(
+ RocksDbKeyValueStorageEngineFactory.class.getName(),
+ tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+ Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+ Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+ Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+ Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
deleted file mode 100644
index 84e5fbe..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.storage.SideInputsProcessor;
-
-
-/**
- * Table descriptor for store backed tables.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
- extends BaseTableDescriptor<K, V, D> {
-
- static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
- static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
- static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
-
- protected List<String> sideInputs;
- protected SideInputsProcessor sideInputsProcessor;
- protected boolean enableChangelog;
- protected String changelogStream;
- protected Integer changelogReplicationFactor;
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- */
- public BaseLocalStoreBackedTableDescriptor(String tableId) {
- super(tableId);
- }
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
- super(tableId, serde);
- }
-
- public D withSideInputs(List<String> sideInputs) {
- this.sideInputs = sideInputs;
- // Disable changelog
- this.enableChangelog = false;
- this.changelogStream = null;
- this.changelogReplicationFactor = null;
- return (D) this;
- }
-
- public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
- this.sideInputsProcessor = sideInputsProcessor;
- return (D) this;
- }
-
- /**
- * Enable changelog for this table, by default changelog is disabled. When the
- * changelog stream name is not specified, it is automatically generated in
- * the format {@literal [job-name]-[job-id]-table-[table-id]}.
- * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
- *
- * @return this table descriptor instance
- */
- public D withChangelogEnabled() {
- this.enableChangelog = true;
- return (D) this;
- }
-
- /**
- * Samza stores are local to a container. If the container fails, the contents of
- * the store are lost. To prevent loss of data, you need to set this property to
- * configure a changelog stream: Samza then ensures that writes to the store are
- * replicated to this stream, and the store is restored from this stream after a
- * failure. The value of this property is given in the form system-name.stream-name.
- * The "system-name" part is optional. If it is omitted you must specify the system
- * in <code>job.changelog.system</code> config. Any output stream can be used as
- * changelog, but you must ensure that only one job ever writes to a given changelog
- * stream (each instance of a job and each store needs its own changelog stream).
- * <p>
- * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
- *
- * @param changelogStream changelog stream name
- * @return this table descriptor instance
- */
- public D withChangelogStream(String changelogStream) {
- this.enableChangelog = true;
- this.changelogStream = changelogStream;
- return (D) this;
- }
-
- /**
- * The property defines the number of replicas to use for the change log stream.
- * <p>
- * Default value is <code>stores.default.changelog.replication.factor</code>.
- * <p>
- * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide
- *
- * @param replicationFactor replication factor
- * @return this table descriptor instance
- */
- public D withChangelogReplicationFactor(int replicationFactor) {
- this.enableChangelog = true;
- this.changelogReplicationFactor = replicationFactor;
- return (D) this;
- }
-
- @Override
- protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
- super.generateTableSpecConfig(tableSpecConfig);
-
- tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
- if (enableChangelog) {
- if (changelogStream != null) {
- tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
- }
- if (changelogReplicationFactor != null) {
- tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
- }
- }
- }
-
- /**
- * Validate that this table descriptor is constructed properly
- */
- protected void validate() {
- super.validate();
- if (sideInputs != null || sideInputsProcessor != null) {
- Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
- String.format("Invalid side input configuration for table: %s. " +
- "Both side inputs and the processor must be provided", tableId));
- }
- if (!enableChangelog) {
- Preconditions.checkState(changelogStream == null,
- String.format("Invalid changelog configuration for table: %s. Changelog " +
- "must be enabled, when changelog stream name is provided", tableId));
- Preconditions.checkState(changelogReplicationFactor == null,
- String.format("Invalid changelog configuration for table: %s. Changelog " +
- "must be enabled, when changelog replication factor is provided", tableId));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
deleted file mode 100644
index e56c977..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.BaseTableProvider;
-import org.apache.samza.table.utils.SerdeUtils;
-
-
-/**
- * Base class for tables backed by Samza local stores. The backing stores are
- * injected during initialization of the table. Since the lifecycle
- * of the underlying stores are already managed by Samza container,
- * the table provider will not manage the lifecycle of the backing
- * stores.
- */
-abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvider {
- public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
-
- protected KeyValueStore kvStore;
-
- public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
- super(tableSpec);
- }
-
- @Override
- public void init(Context context) {
- super.init(context);
-
- Preconditions.checkNotNull(this.context, "Must specify context for local tables.");
-
- kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId());
-
- if (kvStore == null) {
- throw new SamzaException(String.format(
- "Backing store for table %s was not injected by SamzaContainer", tableSpec.getId()));
- }
-
- logger.info("Initialized backing store for table " + tableSpec.getId());
- }
-
- @Override
- public Table getTable() {
- if (kvStore == null) {
- throw new SamzaException("Store not initialized for table " + tableSpec.getId());
- }
- ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
- table.init(this.context);
- return table;
- }
-
- protected Map<String, String> generateCommonStoreConfig(Config jobConfig, Map<String, String> generatedConfig) {
-
- Map<String, String> storeConfig = new HashMap<>();
-
- // serde configurations for tables are generated at top level by JobNodeConfigurationGenerator and are included
- // in the global jobConfig. generatedConfig has all table specific configuration generated from TableSpec, such
- // as TableProviderFactory, sideInputs, etc.
- // Merge the global jobConfig and generatedConfig to get full access to configuration needed to create local
- // store configuration
- Map<String, String> mergedConfigMap = new HashMap<>(jobConfig);
- mergedConfigMap.putAll(generatedConfig);
- JobConfig mergedJobConfig = new JobConfig(new MapConfig(mergedConfigMap));
- JavaTableConfig tableConfig = new JavaTableConfig(mergedJobConfig);
-
- String keySerde = tableConfig.getKeySerde(tableSpec.getId());
- storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);
-
- String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
- storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
-
- List<String> sideInputs = tableSpec.getSideInputs();
- if (sideInputs != null && !sideInputs.isEmpty()) {
- sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
- "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
- String formattedSideInputs = String.join(",", sideInputs);
- storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs);
- storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()),
- SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor()));
- }
-
- // Changelog configuration
- Boolean enableChangelog = Boolean.valueOf(
- tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
- if (enableChangelog) {
- String changelogStream = tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM);
- if (StringUtils.isEmpty(changelogStream)) {
- changelogStream = String.format("%s-%s-table-%s", mergedJobConfig.getName().get(), mergedJobConfig.getJobId(),
- tableSpec.getId());
- }
-
- Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format(
- "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN));
- storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), tableSpec.getId()), changelogStream);
-
- String changelogReplicationFactor = tableSpec.getConfig().get(
- BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR);
- if (changelogReplicationFactor != null) {
- storeConfig.put(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), tableSpec.getId()),
- changelogReplicationFactor);
- }
- }
-
- return storeConfig;
- }
-
- @Override
- public void close() {
- logger.info("Shutting down table provider for table " + tableSpec.getId());
- }
-
- private boolean isValidSystemStreamName(String name) {
- return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java
new file mode 100644
index 0000000..7957fd3
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.SideInputsProcessor;
+
+
+/**
+ * Table descriptor for store backed tables.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
+ extends BaseTableDescriptor<K, V, D> {
+
+ static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
+ static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
+ static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
+
+ protected List<String> sideInputs;
+ protected SideInputsProcessor sideInputsProcessor;
+ protected boolean enableChangelog;
+ protected String changelogStream;
+ protected Integer changelogReplicationFactor;
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ */
+ public BaseLocalStoreBackedTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ /**
+ * Constructs a table descriptor instance
+ * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+ * @param serde the serde for key and value
+ */
+ public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
+ super(tableId, serde);
+ }
+
+ public D withSideInputs(List<String> sideInputs) {
+ this.sideInputs = sideInputs;
+ // Disable changelog
+ this.enableChangelog = false;
+ this.changelogStream = null;
+ this.changelogReplicationFactor = null;
+ return (D) this;
+ }
+
+ public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
+ this.sideInputsProcessor = sideInputsProcessor;
+ return (D) this;
+ }
+
+ /**
+ * Enable changelog for this table, by default changelog is disabled. When the
+ * changelog stream name is not specified, it is automatically generated in
+ * the format {@literal [job-name]-[job-id]-table-[table-id]}.
+ * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+ *
+ * @return this table descriptor instance
+ */
+ public D withChangelogEnabled() {
+ this.enableChangelog = true;
+ return (D) this;
+ }
+
+ /**
+ * Samza stores are local to a container. If the container fails, the contents of
+ * the store are lost. To prevent loss of data, you need to set this property to
+ * configure a changelog stream: Samza then ensures that writes to the store are
+ * replicated to this stream, and the store is restored from this stream after a
+ * failure. The value of this property is given in the form system-name.stream-name.
+ * The "system-name" part is optional. If it is omitted you must specify the system
+ * in <code>job.changelog.system</code> config. Any output stream can be used as
+ * changelog, but you must ensure that only one job ever writes to a given changelog
+ * stream (each instance of a job and each store needs its own changelog stream).
+ * <p>
+ * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+ *
+ * @param changelogStream changelog stream name
+ * @return this table descriptor instance
+ */
+ public D withChangelogStream(String changelogStream) {
+ this.enableChangelog = true;
+ this.changelogStream = changelogStream;
+ return (D) this;
+ }
+
+ /**
+ * The property defines the number of replicas to use for the change log stream.
+ * <p>
+ * Default value is <code>stores.default.changelog.replication.factor</code>.
+ * <p>
+ * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide
+ *
+ * @param replicationFactor replication factor
+ * @return this table descriptor instance
+ */
+ public D withChangelogReplicationFactor(int replicationFactor) {
+ this.enableChangelog = true;
+ this.changelogReplicationFactor = replicationFactor;
+ return (D) this;
+ }
+
+ @Override
+ protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+ super.generateTableSpecConfig(tableSpecConfig);
+
+ tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
+ if (enableChangelog) {
+ if (changelogStream != null) {
+ tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
+ }
+ if (changelogReplicationFactor != null) {
+ tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
+ }
+ }
+ }
+
+ /**
+ * Validate that this table descriptor is constructed properly
+ */
+ protected void validate() {
+ super.validate();
+ if (sideInputs != null || sideInputsProcessor != null) {
+ Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
+ String.format("Invalid side input configuration for table: %s. " +
+ "Both side inputs and the processor must be provided", tableId));
+ }
+ if (!enableChangelog) {
+ Preconditions.checkState(changelogStream == null,
+ String.format("Invalid changelog configuration for table: %s. Changelog " +
+ "must be enabled, when changelog stream name is provided", tableId));
+ Preconditions.checkState(changelogReplicationFactor == null,
+ String.format("Invalid changelog configuration for table: %s. Changelog " +
+ "must be enabled, when changelog replication factor is provided", tableId));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java
new file mode 100644
index 0000000..f5dd71b
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.storage.kv.LocalStoreBackedReadWriteTable;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+import org.apache.samza.table.utils.SerdeUtils;
+
+
+/**
+ * Base class for tables backed by Samza local stores. The backing stores are
+ * injected during initialization of the table. Since the lifecycle
+ * of the underlying stores are already managed by Samza container,
+ * the table provider will not manage the lifecycle of the backing
+ * stores.
+ */
+abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvider {
+ public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
+
+ protected KeyValueStore kvStore;
+
+ public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
+ super(tableSpec);
+ }
+
+ @Override
+ public void init(Context context) {
+ super.init(context);
+
+ Preconditions.checkNotNull(this.context, "Must specify context for local tables.");
+
+ kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId());
+
+ if (kvStore == null) {
+ throw new SamzaException(String.format(
+ "Backing store for table %s was not injected by SamzaContainer", tableSpec.getId()));
+ }
+
+ logger.info("Initialized backing store for table " + tableSpec.getId());
+ }
+
+ @Override
+ public Table getTable() {
+ if (kvStore == null) {
+ throw new SamzaException("Store not initialized for table " + tableSpec.getId());
+ }
+ ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
+ table.init(this.context);
+ return table;
+ }
+
+ protected Map<String, String> generateCommonStoreConfig(Config jobConfig, Map<String, String> generatedConfig) {
+
+ Map<String, String> storeConfig = new HashMap<>();
+
+ // serde configurations for tables are generated at top level by JobNodeConfigurationGenerator and are included
+ // in the global jobConfig. generatedConfig has all table specific configuration generated from TableSpec, such
+ // as TableProviderFactory, sideInputs, etc.
+ // Merge the global jobConfig and generatedConfig to get full access to configuration needed to create local
+ // store configuration
+ Map<String, String> mergedConfigMap = new HashMap<>(jobConfig);
+ mergedConfigMap.putAll(generatedConfig);
+ JobConfig mergedJobConfig = new JobConfig(new MapConfig(mergedConfigMap));
+ JavaTableConfig tableConfig = new JavaTableConfig(mergedJobConfig);
+
+ String keySerde = tableConfig.getKeySerde(tableSpec.getId());
+ storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);
+
+ String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
+ storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
+
+ List<String> sideInputs = tableSpec.getSideInputs();
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
+ "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
+ String formattedSideInputs = String.join(",", sideInputs);
+ storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs);
+ storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()),
+ SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor()));
+ }
+
+ // Changelog configuration
+ Boolean enableChangelog = Boolean.valueOf(
+ tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
+ if (enableChangelog) {
+ String changelogStream = tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM);
+ if (StringUtils.isEmpty(changelogStream)) {
+ changelogStream = String.format("%s-%s-table-%s", mergedJobConfig.getName().get(), mergedJobConfig.getJobId(),
+ tableSpec.getId());
+ }
+
+ Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format(
+ "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN));
+ storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), tableSpec.getId()), changelogStream);
+
+ String changelogReplicationFactor = tableSpec.getConfig().get(
+ BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR);
+ if (changelogReplicationFactor != null) {
+ storeConfig.put(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), tableSpec.getId()),
+ changelogReplicationFactor);
+ }
+ }
+
+ return storeConfig;
+ }
+
+ @Override
+ public void close() {
+ logger.info("Shutting down table provider for table " + tableSpec.getId());
+ }
+
+ private boolean isValidSystemStreamName(String name) {
+ return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
+ }
+}