You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:43 UTC

[03/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java
new file mode 100644
index 0000000..0b48e6b
--- /dev/null
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/descriptors/TestInMemoryTableProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestInMemoryTableProvider {
+  @Test
+  public void testGenerateConfig() {
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    tableSpecConfig.put("inmemory.c1", "c1-value");
+    tableSpecConfig.put("inmemory.c2", "c2-value");
+    tableSpecConfig.put("c3", "c3-value");
+    tableSpecConfig.put("c4", "c4-value");
+
+    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
+        "my-table-provider-factory", tableSpecConfig);
+
+    Map<String, String> generatedConfig = new HashMap<>();
+    generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableProvider tableProvider = new InMemoryTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
+
+    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+    Assert.assertEquals(
+        InMemoryKeyValueStorageEngineFactory.class.getName(),
+        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
deleted file mode 100644
index 50bc2c2..0000000
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Table descriptor for RocksDb backed tables
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> {
-
-  static final public String WRITE_BATCH_SIZE = "write.batch.size";
-  static final public String OBJECT_CACHE_SIZE = "object.cache.size";
-  static final public String CONTAINER_CACHE_SIZE_BYTES = "container.cache.size.bytes";
-  static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = "container.write.buffer.size.bytes";
-  static final public String ROCKSDB_COMPRESSION = "rocksdb.compression";
-  static final public String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes";
-  static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms";
-  static final public String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style";
-  static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
-  static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
-  static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
-
-  private Integer writeBatchSize;
-  private Integer objectCacheSize;
-  private Integer cacheSize;
-  private Integer writeBufferSize;
-  private Integer blockSize;
-  private Integer ttl;
-  private Integer numWriteBuffers;
-  private Integer maxLogFileSize;
-  private Integer numLogFilesToKeep;
-  private String compressionType;
-  private String compactionStyle;
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   */
-  public RocksDbTableDescriptor(String tableId) {
-    super(tableId);
-  }
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   * @param serde the serde for key and value
-   */
-  public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    super(tableId, serde);
-  }
-
-  /**
-   * For better write performance, the storage engine buffers writes and applies them to the
-   * underlying store in a batch. If the same key is written multiple times in quick succession,
-   * this buffer also deduplicates writes to the same key. This property is set to the number
-   * of key/value pairs that should be kept in this in-memory buffer, per task instance.
-   * The number cannot be greater than {@link #withObjectCacheSize}.
-   * <p>
-   * Default value is 500.
-   * <p>
-   * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
-   *
-   * @param writeBatchSize write batch size
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) {
-    this.writeBatchSize = writeBatchSize;
-    return this;
-  }
-
-  /**
-   * Samza maintains an additional cache in front of RocksDB for frequently-accessed objects.
-   * This cache contains deserialized objects (avoiding the deserialization overhead on cache
-   * hits), in contrast to the RocksDB block cache ({@link #withCacheSize}), which caches
-   * serialized objects. This property determines the number of objects to keep in Samza's
-   * cache, per task instance. This same cache is also used for write buffering
-   * (see {@link #withWriteBatchSize}). A value of 0 disables all caching and batching.
-   * <p>
-   * Default value is 1,000.
-   * <p>
-   * Refer to <code>stores.store-name.object.cache.size</code> in Samza configuration guide
-   *
-   * @param objectCacheSize the object cache size
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) {
-    this.objectCacheSize = objectCacheSize;
-    return this;
-  }
-
-  /**
-   * The size of RocksDB's block cache in bytes, per container. If there are several task
-   * instances within one container, each is given a proportional share of this cache.
-   * Note that this is an off-heap memory allocation, so the container's total memory
-   * use is the maximum JVM heap size plus the size of this cache.
-   * <p>
-   * Default value is 104,857,600.
-   * <p>
-   * Refer to <code>stores.store-name.container.cache.size.bytes</code> in Samza configuration guide
-   *
-   * @param cacheSize the cache size in bytes
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) {
-    this.cacheSize = cacheSize;
-    return this;
-  }
-
-  /**
-   * The amount of memory (in bytes) that RocksDB uses for buffering writes before they are
-   * written to disk, per container. If there are several task instances within one container,
-   * each is given a proportional share of this buffer. This setting also determines the
-   * size of RocksDB's segment files.
-   * <p>
-   * Default value is 33,554,432.
-   * <p>
-   * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> in Samza configuration guide
-   *
-   * @param writeBufferSize the write buffer size in bytes
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) {
-    this.writeBufferSize = writeBufferSize;
-    return this;
-  }
-
-  /**
-   * Controls whether RocksDB should compress data on disk and in the block cache.
-   * The following values are valid:
-   * <ul>
-   *   <li><b>snappy</b> Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.
-   *   <li><b>bzip2</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Bzip2">bzip2</a> codec.
-   *   <li><b>zlib</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Zlib">zlib</a> codec.
-   *   <li><b>lz4</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4</a> codec.
-   *   <li><b>lz4hc</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4hc</a> (high compression) codec.
-   *   <li><b>none</b> Do not compress data.
-   * </ul>
-   * <p>
-   * Default value is snappy.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza configuration guide
-   *
-   * @param compressionType the compression type
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withCompressionType(String compressionType) {
-    this.compressionType = compressionType;
-    return this;
-  }
-
-  /**
-   * If compression is enabled, RocksDB groups approximately this many uncompressed
-   * bytes into one compressed block. You probably don't need to change this property.
-   * <p>
-   * Default value is 4,096.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza configuration guide
-   *
-   * @param blockSize the block size in bytes
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) {
-    this.blockSize = blockSize;
-    return this;
-  }
-
-  /**
-   * The time-to-live of the store. Please note it's not a strict TTL limit (removed
-   * only after compaction). Please use caution opening a database with and without
-   * TTL, as it might corrupt the database. Please make sure to read the
-   * <a href="https://github.com/facebook/rocksdb/wiki/Time-to-Live">constraints</a>
-   * before using.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza configuration guide
-   *
-   * @param ttl the time to live in milliseconds
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withTtl(int ttl) {
-    this.ttl = ttl;
-    return this;
-  }
-
-  /**
-   * This property controls the compaction style that RocksDB will employ when compacting
-   * its levels. The following values are valid:
-   * <ul>
-   *   <li><b>universal</b> Use <a href="https://github.com/facebook/rocksdb/wiki/Universal-Compaction">universal</a> compaction.
-   *   <li><b>fifo</b> Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.
-   *   <li><b>level</b> Use RocksDB's standard leveled compaction.
-   * </ul>
-   * <p>
-   * Default value is universal.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza configuration guide
-   *
-   * @param compactionStyle the compaction style
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withCompactionStyle(String compactionStyle) {
-    this.compactionStyle = compactionStyle;
-    return this;
-  }
-
-  /**
-   * Configures the
-   * <a href="https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer">
-   * number of write buffers</a> that a RocksDB store uses. This allows RocksDB
-   * to continue taking writes to other buffers even while a given write buffer is being
-   * flushed to disk.
-   * <p>
-   * Default value is 3.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide
-   *
-   * @param numWriteBuffers the number of write buffers
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) {
-    this.numWriteBuffers = numWriteBuffers;
-    return this;
-  }
-
-  /**
-   * The maximum size in bytes of the RocksDB LOG file before it is rotated.
-   * <p>
-   * Default value is 67,108,864.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> in Samza configuration guide
-   *
-   * @param maxLogFileSize the maximal log file size in bytes
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) {
-    this.maxLogFileSize = maxLogFileSize;
-    return this;
-  }
-
-  /**
-   * The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
-   * <p>
-   * Default value is 2.
-   * <p>
-   * Refer to <code>stores.store-name.rocksdb.keep.log.file.num</code> in Samza configuration guide
-   *
-   * @param numLogFilesToKeep the number of log files to keep
-   * @return this table descriptor instance
-   */
-  public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int numLogFilesToKeep) {
-    this.numLogFilesToKeep = numLogFilesToKeep;
-    return this;
-  }
-
-  /**
-   * Create a table spec based on this table description
-   * @return the table spec
-   */
-  @Override
-  public TableSpec getTableSpec() {
-
-    validate();
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    generateTableSpecConfig(tableSpecConfig);
-
-    return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig,
-        sideInputs, sideInputsProcessor);
-  }
-
-  @Override
-  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
-
-    super.generateTableSpecConfig(tableSpecConfig);
-
-    if (writeBatchSize != null) {
-      addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, writeBatchSize.toString());
-    }
-    if (objectCacheSize != null) {
-      addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, objectCacheSize.toString());
-    }
-    if (cacheSize != null) {
-      addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, cacheSize.toString());
-    }
-    if (writeBufferSize != null) {
-      addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, writeBufferSize.toString());
-    }
-    if (compressionType != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType);
-    }
-    if (blockSize != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, blockSize.toString());
-    }
-    if (ttl != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString());
-    }
-    if (compactionStyle != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, compactionStyle);
-    }
-    if (numWriteBuffers != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, numWriteBuffers.toString());
-    }
-    if (maxLogFileSize != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, maxLogFileSize.toString());
-    }
-    if (numLogFilesToKeep != null) {
-      addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString());
-    }
-  }
-
-  private void addRocksDbConfig(Map<String, String> map, String key, String value) {
-    map.put("rocksdb." + key, value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
deleted file mode 100644
index df60a5a..0000000
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.config.ClusterManagerConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Table provider for tables backed by RocksDb.
- */
-public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
-
-  public RocksDbTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
-  }
-
-  @Override
-  public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
-
-    Map<String, String> tableConfig = new HashMap<>();
-
-    // Store factory configuration
-    tableConfig.put(String.format(
-        StorageConfig.FACTORY(), tableSpec.getId()),
-        RocksDbKeyValueStorageEngineFactory.class.getName());
-
-    // Common store configuration
-    tableConfig.putAll(generateCommonStoreConfig(jobConfig, generatedConfig));
-
-    // Rest of the configuration
-    tableSpec.getConfig().entrySet().stream()
-        .filter(e -> !e.getKey().startsWith("internal."))
-        .forEach(e -> {
-          String k = e.getKey();
-          String v = e.getValue();
-          String realKey = k.startsWith("rocksdb.")
-              ? String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length())
-              : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
-          tableConfig.put(realKey, v);
-        });
-
-    // Enable host affinity
-    tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true");
-
-    logger.info("Generated configuration for table " + tableSpec.getId());
-
-    return tableConfig;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
deleted file mode 100644
index dbe0f97..0000000
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-
-public class RocksDbTableProviderFactory implements TableProviderFactory {
-  @Override
-  public TableProvider getTableProvider(TableSpec tableSpec) {
-    return new RocksDbTableProvider(tableSpec);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
new file mode 100644
index 0000000..a8ede78
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table descriptor for RocksDb backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> {
+
+  static final public String WRITE_BATCH_SIZE = "write.batch.size";
+  static final public String OBJECT_CACHE_SIZE = "object.cache.size";
+  static final public String CONTAINER_CACHE_SIZE_BYTES = "container.cache.size.bytes";
+  static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = "container.write.buffer.size.bytes";
+  static final public String ROCKSDB_COMPRESSION = "rocksdb.compression";
+  static final public String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes";
+  static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms";
+  static final public String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style";
+  static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
+  static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
+  static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
+
+  private Integer writeBatchSize;
+  private Integer objectCacheSize;
+  private Integer cacheSize;
+  private Integer writeBufferSize;
+  private Integer blockSize;
+  private Integer ttl;
+  private Integer numWriteBuffers;
+  private Integer maxLogFileSize;
+  private Integer numLogFilesToKeep;
+  private String compressionType;
+  private String compactionStyle;
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  public RocksDbTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
+  /**
+   * For better write performance, the storage engine buffers writes and applies them to the
+   * underlying store in a batch. If the same key is written multiple times in quick succession,
+   * this buffer also deduplicates writes to the same key. This property is set to the number
+   * of key/value pairs that should be kept in this in-memory buffer, per task instance.
+   * The number cannot be greater than {@link #withObjectCacheSize}.
+   * <p>
+   * Default value is 500.
+   * <p>
+   * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
+   *
+   * @param writeBatchSize write batch size
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) {
+    this.writeBatchSize = writeBatchSize;
+    return this;
+  }
+
+  /**
+   * Samza maintains an additional cache in front of RocksDB for frequently-accessed objects.
+   * This cache contains deserialized objects (avoiding the deserialization overhead on cache
+   * hits), in contrast to the RocksDB block cache ({@link #withCacheSize}), which caches
+   * serialized objects. This property determines the number of objects to keep in Samza's
+   * cache, per task instance. This same cache is also used for write buffering
+   * (see {@link #withWriteBatchSize}). A value of 0 disables all caching and batching.
+   * <p>
+   * Default value is 1,000.
+   * <p>
+   * Refer to <code>stores.store-name.object.cache.size</code> in Samza configuration guide
+   *
+   * @param objectCacheSize the object cache size
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) {
+    this.objectCacheSize = objectCacheSize;
+    return this;
+  }
+
+  /**
+   * The size of RocksDB's block cache in bytes, per container. If there are several task
+   * instances within one container, each is given a proportional share of this cache.
+   * Note that this is an off-heap memory allocation, so the container's total memory
+   * use is the maximum JVM heap size plus the size of this cache.
+   * <p>
+   * Default value is 104,857,600.
+   * <p>
+   * Refer to <code>stores.store-name.container.cache.size.bytes</code> in Samza configuration guide
+   *
+   * @param cacheSize the cache size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) {
+    this.cacheSize = cacheSize;
+    return this;
+  }
+
+  /**
+   * The amount of memory (in bytes) that RocksDB uses for buffering writes before they are
+   * written to disk, per container. If there are several task instances within one container,
+   * each is given a proportional share of this buffer. This setting also determines the
+   * size of RocksDB's segment files.
+   * <p>
+   * Default value is 33,554,432.
+   * <p>
+   * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> in Samza configuration guide
+   *
+   * @param writeBufferSize the write buffer size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  /**
+   * Controls whether RocksDB should compress data on disk and in the block cache.
+   * The following values are valid:
+   * <ul>
+   *   <li><b>snappy</b> Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.
+   *   <li><b>bzip2</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Bzip2">bzip2</a> codec.
+   *   <li><b>zlib</b> Compress data using the <a href="http://en.wikipedia.org/wiki/Zlib">zlib</a> codec.
+   *   <li><b>lz4</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4</a> codec.
+   *   <li><b>lz4hc</b> Compress data using the <a href="https://code.google.com/p/lz4/">lz4hc</a> (high compression) codec.
+   *   <li><b>none</b> Do not compress data.
+   * </ul>
+   * <p>
+   * Default value is snappy.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza configuration guide
+   *
+   * @param compressionType the compression type
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCompressionType(String compressionType) {
+    this.compressionType = compressionType;
+    return this;
+  }
+
+  /**
+   * If compression is enabled, RocksDB groups approximately this many uncompressed
+   * bytes into one compressed block. You probably don't need to change this property.
+   * <p>
+   * Default value is 4,096.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza configuration guide
+   *
+   * @param blockSize the block size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) {
+    this.blockSize = blockSize;
+    return this;
+  }
+
+  /**
+   * The time-to-live of the store. Please note it's not a strict TTL limit (removed
+   * only after compaction). Please use caution opening a database with and without
+   * TTL, as it might corrupt the database. Please make sure to read the
+   * <a href="https://github.com/facebook/rocksdb/wiki/Time-to-Live">constraints</a>
+   * before using.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza configuration guide
+   *
+   * @param ttl the time to live in milliseconds
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withTtl(int ttl) {
+    this.ttl = ttl;
+    return this;
+  }
+
+  /**
+   * This property controls the compaction style that RocksDB will employ when compacting
+   * its levels. The following values are valid:
+   * <ul>
+   *   <li><b>universal</b> Use <a href="https://github.com/facebook/rocksdb/wiki/Universal-Compaction">universal</a> compaction.
+   *   <li><b>fifo</b> Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.
+   *   <li><b>level</b> Use RocksDB's standard leveled compaction.
+   * </ul>
+   * <p>
+   * Default value is universal.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza configuration guide
+   *
+   * @param compactionStyle the compaction style
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCompactionStyle(String compactionStyle) {
+    this.compactionStyle = compactionStyle;
+    return this;
+  }
+
+  /**
+   * Configures the
+   * <a href="https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer">
+   * number of write buffers</a> that a RocksDB store uses. This allows RocksDB
+   * to continue taking writes to other buffers even while a given write buffer is being
+   * flushed to disk.
+   * <p>
+   * Default value is 3.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide
+   *
+   * @param numWriteBuffers the number of write buffers
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) {
+    this.numWriteBuffers = numWriteBuffers;
+    return this;
+  }
+
+  /**
+   * The maximum size in bytes of the RocksDB LOG file before it is rotated.
+   * <p>
+   * Default value is 67,108,864.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> in Samza configuration guide
+   *
+   * @param maxLogFileSize the maximal log file size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) {
+    this.maxLogFileSize = maxLogFileSize;
+    return this;
+  }
+
+  /**
+   * The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
+   * <p>
+   * Default value is 2.
+   * <p>
+   * Refer to <code>stores.store-name.rocksdb.keep.log.file.num</code> in Samza configuration guide
+   *
+   * @param numLogFilesToKeep the number of log files to keep
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int numLogFilesToKeep) {
+    this.numLogFilesToKeep = numLogFilesToKeep;
+    return this;
+  }
+
+  /**
+   * Create a table spec based on this table description
+   * @return the table spec
+   */
+  @Override
+  public TableSpec getTableSpec() {
+
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig,
+        sideInputs, sideInputsProcessor);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+
+    super.generateTableSpecConfig(tableSpecConfig);
+
+    if (writeBatchSize != null) {
+      addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, writeBatchSize.toString());
+    }
+    if (objectCacheSize != null) {
+      addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, objectCacheSize.toString());
+    }
+    if (cacheSize != null) {
+      addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, cacheSize.toString());
+    }
+    if (writeBufferSize != null) {
+      addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, writeBufferSize.toString());
+    }
+    if (compressionType != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType);
+    }
+    if (blockSize != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, blockSize.toString());
+    }
+    if (ttl != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString());
+    }
+    if (compactionStyle != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, compactionStyle);
+    }
+    if (numWriteBuffers != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, numWriteBuffers.toString());
+    }
+    if (maxLogFileSize != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, maxLogFileSize.toString());
+    }
+    if (numLogFilesToKeep != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString());
+    }
+  }
+
+  private void addRocksDbConfig(Map<String, String> map, String key, String value) {
+    map.put("rocksdb." + key, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java
new file mode 100644
index 0000000..e0c3355
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table provider for tables backed by RocksDb.
+ */
+public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
+
+  public RocksDbTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Store factory configuration
+    tableConfig.put(String.format(
+        StorageConfig.FACTORY(), tableSpec.getId()),
+        RocksDbKeyValueStorageEngineFactory.class.getName());
+
+    // Common store configuration
+    tableConfig.putAll(generateCommonStoreConfig(jobConfig, generatedConfig));
+
+    // Rest of the configuration
+    tableSpec.getConfig().entrySet().stream()
+        .filter(e -> !e.getKey().startsWith("internal."))
+        .forEach(e -> {
+          String k = e.getKey();
+          String v = e.getValue();
+          String realKey = k.startsWith("rocksdb.")
+              ? String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length())
+              : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+          tableConfig.put(realKey, v);
+        });
+
+    // Enable host affinity
+    tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true");
+
+    logger.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java
new file mode 100644
index 0000000..74e74db
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableProviderFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.descriptors.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+
+public class RocksDbTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new RocksDbTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
deleted file mode 100644
index cd7e85c..0000000
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import junit.framework.Assert;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-
-public class TestRocksDbTableDescriptor {
-
-  @Test
-  public void testMinimal() {
-    new RocksDbTableDescriptor<Integer, String>("1")
-        .validate();
-  }
-
-  @Test
-  public void testSerde() {
-    TableSpec tableSpec = new RocksDbTableDescriptor("1",
-            KVSerde.of(new IntegerSerde(), new StringSerde()))
-        .getTableSpec();
-    Assert.assertNotNull(tableSpec.getSerde());
-    Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
-    Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class);
-  }
-
-  @Test
-  public void testTableSpec() {
-
-    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
-            KVSerde.of(new IntegerSerde(), new StringSerde()))
-        .withBlockSize(1)
-        .withCacheSize(2)
-        .withCompactionStyle("fifo")
-        .withCompressionType("snappy")
-        .withMaxLogFileSize(3)
-        .withNumLogFilesToKeep(4)
-        .withNumWriteBuffers(5)
-        .withObjectCacheSize(6)
-        .withTtl(7)
-        .withWriteBatchSize(8)
-        .withWriteBufferSize(9)
-        .withConfig("rocksdb.abc", "xyz")
-        .getTableSpec();
-
-    Assert.assertNotNull(tableSpec.getSerde());
-    Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
-    Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
-    Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
-    Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
-    Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
-    Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
-    Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
-    Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
-    Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS));
-    Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE));
-    Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
-    Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
-    Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
-    Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
-    Assert.assertEquals("false", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
-  }
-
-  @Test
-  public void testTableSpecWithChangelogEnabled() {
-
-    TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
-        .withChangelogStream("changelog-$tream")
-        .withChangelogReplicationFactor(10)
-        .getTableSpec();
-
-    Assert.assertEquals("10", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR));
-    Assert.assertEquals("changelog-$tream", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM));
-    Assert.assertEquals("true", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
-  }
-
-  private String getConfig(TableSpec tableSpec, String key) {
-    return tableSpec.getConfig().get("rocksdb." + key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
deleted file mode 100644
index 8ce061c..0000000
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-
-public class TestRocksDbTableProvider {
-  @Test
-  public void testGenerateConfig() {
-
-    Map<String, String> tableSpecConfig = new HashMap<>();
-    tableSpecConfig.put("rocksdb.c1", "c1-value");
-    tableSpecConfig.put("rocksdb.c2", "c2-value");
-    tableSpecConfig.put("c3", "c3-value");
-    tableSpecConfig.put("c4", "c4-value");
-
-    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
-        "my-table-provider-factory", tableSpecConfig);
-
-    Map<String, String> generatedConfig = new HashMap<>();
-    generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
-    generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
-
-    TableProvider tableProvider = new RocksDbTableProvider(tableSpec);
-    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
-
-    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
-    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
-    Assert.assertEquals(
-        RocksDbKeyValueStorageEngineFactory.class.getName(),
-        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
-    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
-    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
-    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
-    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
new file mode 100644
index 0000000..86efea5
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import junit.framework.Assert;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+
+public class TestRocksDbTableDescriptor {
+
+  @Test
+  public void testMinimal() {
+    new RocksDbTableDescriptor<Integer, String>("1")
+        .validate();
+  }
+
+  @Test
+  public void testSerde() {
+    TableSpec tableSpec = new RocksDbTableDescriptor("1",
+            KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .getTableSpec();
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
+    Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class);
+  }
+
+  @Test
+  public void testTableSpec() {
+
+    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1",
+            KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .withBlockSize(1)
+        .withCacheSize(2)
+        .withCompactionStyle("fifo")
+        .withCompressionType("snappy")
+        .withMaxLogFileSize(3)
+        .withNumLogFilesToKeep(4)
+        .withNumWriteBuffers(5)
+        .withObjectCacheSize(6)
+        .withTtl(7)
+        .withWriteBatchSize(8)
+        .withWriteBufferSize(9)
+        .withConfig("rocksdb.abc", "xyz")
+        .getTableSpec();
+
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
+    Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
+    Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
+    Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
+    Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
+    Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
+    Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
+    Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
+    Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS));
+    Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE));
+    Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
+    Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
+    Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
+    Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
+    Assert.assertEquals("false", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
+  }
+
+  @Test
+  public void testTableSpecWithChangelogEnabled() {
+
+    TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .withChangelogStream("changelog-$tream")
+        .withChangelogReplicationFactor(10)
+        .getTableSpec();
+
+    Assert.assertEquals("10", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR));
+    Assert.assertEquals("changelog-$tream", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM));
+    Assert.assertEquals("true", tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
+  }
+
+  private String getConfig(TableSpec tableSpec, String key) {
+    return tableSpec.getConfig().get("rocksdb." + key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java
new file mode 100644
index 0000000..5e4601d
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.table.descriptors.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestRocksDbTableProvider {
+  @Test
+  public void testGenerateConfig() {
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    tableSpecConfig.put("rocksdb.c1", "c1-value");
+    tableSpecConfig.put("rocksdb.c2", "c2-value");
+    tableSpecConfig.put("c3", "c3-value");
+    tableSpecConfig.put("c4", "c4-value");
+
+    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
+        "my-table-provider-factory", tableSpecConfig);
+
+    Map<String, String> generatedConfig = new HashMap<>();
+    generatedConfig.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    generatedConfig.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableProvider tableProvider = new RocksDbTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(new MapConfig(), generatedConfig);
+
+    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+    Assert.assertEquals(
+        RocksDbKeyValueStorageEngineFactory.class.getName(),
+        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
deleted file mode 100644
index 84e5fbe..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.storage.SideInputsProcessor;
-
-
-/**
- * Table descriptor for store backed tables.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
-    extends BaseTableDescriptor<K, V, D> {
-
-  static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
-  static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
-  static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
-
-  protected List<String> sideInputs;
-  protected SideInputsProcessor sideInputsProcessor;
-  protected boolean enableChangelog;
-  protected String changelogStream;
-  protected Integer changelogReplicationFactor;
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   */
-  public BaseLocalStoreBackedTableDescriptor(String tableId) {
-    super(tableId);
-  }
-
-  /**
-   * Constructs a table descriptor instance
-   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
-   * @param serde the serde for key and value
-   */
-  public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
-    super(tableId, serde);
-  }
-
-  public D withSideInputs(List<String> sideInputs) {
-    this.sideInputs = sideInputs;
-    // Disable changelog
-    this.enableChangelog = false;
-    this.changelogStream = null;
-    this.changelogReplicationFactor = null;
-    return (D) this;
-  }
-
-  public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
-    this.sideInputsProcessor = sideInputsProcessor;
-    return (D) this;
-  }
-
-  /**
-   * Enable changelog for this table, by default changelog is disabled. When the
-   * changelog stream name is not specified, it is automatically generated in
-   * the format {@literal [job-name]-[job-id]-table-[table-id]}.
-   * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
-   *
-   * @return this table descriptor instance
-   */
-  public D withChangelogEnabled() {
-    this.enableChangelog = true;
-    return (D) this;
-  }
-
-  /**
-   * Samza stores are local to a container. If the container fails, the contents of
-   * the store are lost. To prevent loss of data, you need to set this property to
-   * configure a changelog stream: Samza then ensures that writes to the store are
-   * replicated to this stream, and the store is restored from this stream after a
-   * failure. The value of this property is given in the form system-name.stream-name.
-   * The "system-name" part is optional. If it is omitted you must specify the system
-   * in <code>job.changelog.system</code> config. Any output stream can be used as
-   * changelog, but you must ensure that only one job ever writes to a given changelog
-   * stream (each instance of a job and each store needs its own changelog stream).
-   * <p>
-   * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
-   *
-   * @param changelogStream changelog stream name
-   * @return this table descriptor instance
-   */
-  public D withChangelogStream(String changelogStream) {
-    this.enableChangelog = true;
-    this.changelogStream = changelogStream;
-    return (D) this;
-  }
-
-  /**
-   * The property defines the number of replicas to use for the change log stream.
-   * <p>
-   * Default value is <code>stores.default.changelog.replication.factor</code>.
-   * <p>
-   * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide
-   *
-   * @param replicationFactor replication factor
-   * @return this table descriptor instance
-   */
-  public D withChangelogReplicationFactor(int replicationFactor) {
-    this.enableChangelog = true;
-    this.changelogReplicationFactor = replicationFactor;
-    return (D) this;
-  }
-
-  @Override
-  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
-    super.generateTableSpecConfig(tableSpecConfig);
-
-    tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
-    if (enableChangelog) {
-      if (changelogStream != null) {
-        tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
-      }
-      if (changelogReplicationFactor != null) {
-        tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
-      }
-    }
-  }
-
-  /**
-   * Validate that this table descriptor is constructed properly
-   */
-  protected void validate() {
-    super.validate();
-    if (sideInputs != null || sideInputsProcessor != null) {
-      Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
-          String.format("Invalid side input configuration for table: %s. " +
-              "Both side inputs and the processor must be provided", tableId));
-    }
-    if (!enableChangelog) {
-      Preconditions.checkState(changelogStream == null,
-          String.format("Invalid changelog configuration for table: %s. Changelog " +
-              "must be enabled, when changelog stream name is provided", tableId));
-      Preconditions.checkState(changelogReplicationFactor == null,
-          String.format("Invalid changelog configuration for table: %s. Changelog " +
-              "must be enabled, when changelog replication factor is provided", tableId));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
deleted file mode 100644
index e56c977..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StorageConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.BaseTableProvider;
-import org.apache.samza.table.utils.SerdeUtils;
-
-
-/**
- * Base class for tables backed by Samza local stores. The backing stores are
- * injected during initialization of the table. Since the lifecycle
- * of the underlying stores are already managed by Samza container,
- * the table provider will not manage the lifecycle of the backing
- * stores.
- */
-abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvider {
-  public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
-
-  protected KeyValueStore kvStore;
-
-  public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
-    super(tableSpec);
-  }
-
-  @Override
-  public void init(Context context) {
-    super.init(context);
-
-    Preconditions.checkNotNull(this.context, "Must specify context for local tables.");
-
-    kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId());
-
-    if (kvStore == null) {
-      throw new SamzaException(String.format(
-          "Backing store for table %s was not injected by SamzaContainer", tableSpec.getId()));
-    }
-
-    logger.info("Initialized backing store for table " + tableSpec.getId());
-  }
-
-  @Override
-  public Table getTable() {
-    if (kvStore == null) {
-      throw new SamzaException("Store not initialized for table " + tableSpec.getId());
-    }
-    ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
-    table.init(this.context);
-    return table;
-  }
-
-  protected Map<String, String> generateCommonStoreConfig(Config jobConfig, Map<String, String> generatedConfig) {
-
-    Map<String, String> storeConfig = new HashMap<>();
-
-    // serde configurations for tables are generated at top level by JobNodeConfigurationGenerator and are included
-    // in the global jobConfig. generatedConfig has all table specific configuration generated from TableSpec, such
-    // as TableProviderFactory, sideInputs, etc.
-    // Merge the global jobConfig and generatedConfig to get full access to configuration needed to create local
-    // store configuration
-    Map<String, String> mergedConfigMap = new HashMap<>(jobConfig);
-    mergedConfigMap.putAll(generatedConfig);
-    JobConfig mergedJobConfig = new JobConfig(new MapConfig(mergedConfigMap));
-    JavaTableConfig tableConfig = new JavaTableConfig(mergedJobConfig);
-
-    String keySerde = tableConfig.getKeySerde(tableSpec.getId());
-    storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);
-
-    String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
-    storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
-
-    List<String> sideInputs = tableSpec.getSideInputs();
-    if (sideInputs != null && !sideInputs.isEmpty()) {
-      sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
-          "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
-      String formattedSideInputs = String.join(",", sideInputs);
-      storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs);
-      storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()),
-          SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor()));
-    }
-
-    // Changelog configuration
-    Boolean enableChangelog = Boolean.valueOf(
-        tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
-    if (enableChangelog) {
-      String changelogStream = tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM);
-      if (StringUtils.isEmpty(changelogStream)) {
-        changelogStream = String.format("%s-%s-table-%s", mergedJobConfig.getName().get(), mergedJobConfig.getJobId(),
-            tableSpec.getId());
-      }
-
-      Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format(
-          "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN));
-      storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), tableSpec.getId()), changelogStream);
-
-      String changelogReplicationFactor = tableSpec.getConfig().get(
-          BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR);
-      if (changelogReplicationFactor != null) {
-        storeConfig.put(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), tableSpec.getId()),
-            changelogReplicationFactor);
-      }
-    }
-
-    return storeConfig;
-  }
-
-  @Override
-  public void close() {
-    logger.info("Shutting down table provider for table " + tableSpec.getId());
-  }
-
-  private boolean isValidSystemStreamName(String name) {
-    return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java
new file mode 100644
index 0000000..7957fd3
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableDescriptor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.SideInputsProcessor;
+
+
+/**
+ * Table descriptor for store backed tables.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  static final public String INTERNAL_ENABLE_CHANGELOG = "internal.enable.changelog";
+  static final public String INTERNAL_CHANGELOG_STREAM = "internal.changelog.stream";
+  static final public String INTERNAL_CHANGELOG_REPLICATION_FACTOR = "internal.changelog.replication.factor";
+
+  protected List<String> sideInputs;
+  protected SideInputsProcessor sideInputsProcessor;
+  protected boolean enableChangelog;
+  protected String changelogStream;
+  protected Integer changelogReplicationFactor;
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   */
+  public BaseLocalStoreBackedTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
+   * @param serde the serde for key and value
+   */
+  public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) {
+    super(tableId, serde);
+  }
+
+  public D withSideInputs(List<String> sideInputs) {
+    this.sideInputs = sideInputs;
+    // Disable changelog
+    this.enableChangelog = false;
+    this.changelogStream = null;
+    this.changelogReplicationFactor = null;
+    return (D) this;
+  }
+
+  public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
+    this.sideInputsProcessor = sideInputsProcessor;
+    return (D) this;
+  }
+
+  /**
+   * Enable changelog for this table, by default changelog is disabled. When the
+   * changelog stream name is not specified, it is automatically generated in
+   * the format {@literal [job-name]-[job-id]-table-[table-id]}.
+   * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+   *
+   * @return this table descriptor instance
+   */
+  public D withChangelogEnabled() {
+    this.enableChangelog = true;
+    return (D) this;
+  }
+
+  /**
+   * Samza stores are local to a container. If the container fails, the contents of
+   * the store are lost. To prevent loss of data, you need to set this property to
+   * configure a changelog stream: Samza then ensures that writes to the store are
+   * replicated to this stream, and the store is restored from this stream after a
+   * failure. The value of this property is given in the form system-name.stream-name.
+   * The "system-name" part is optional. If it is omitted you must specify the system
+   * in <code>job.changelog.system</code> config. Any output stream can be used as
+   * changelog, but you must ensure that only one job ever writes to a given changelog
+   * stream (each instance of a job and each store needs its own changelog stream).
+   * <p>
+   * Refer to <code>stores.store-name.changelog</code> in Samza configuration guide
+   *
+   * @param changelogStream changelog stream name
+   * @return this table descriptor instance
+   */
+  public D withChangelogStream(String changelogStream) {
+    this.enableChangelog = true;
+    this.changelogStream = changelogStream;
+    return (D) this;
+  }
+
+  /**
+   * The property defines the number of replicas to use for the change log stream.
+   * <p>
+   * Default value is <code>stores.default.changelog.replication.factor</code>.
+   * <p>
+   * Refer to <code>stores.store-name.changelog.replication.factor</code> in Samza configuration guide
+   *
+   * @param replicationFactor replication factor
+   * @return this table descriptor instance
+   */
+  public D withChangelogReplicationFactor(int replicationFactor) {
+    this.enableChangelog = true;
+    this.changelogReplicationFactor = replicationFactor;
+    return (D) this;
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    super.generateTableSpecConfig(tableSpecConfig);
+
+    tableSpecConfig.put(INTERNAL_ENABLE_CHANGELOG, String.valueOf(enableChangelog));
+    if (enableChangelog) {
+      if (changelogStream != null) {
+        tableSpecConfig.put(INTERNAL_CHANGELOG_STREAM, changelogStream);
+      }
+      if (changelogReplicationFactor != null) {
+        tableSpecConfig.put(INTERNAL_CHANGELOG_REPLICATION_FACTOR, String.valueOf(changelogReplicationFactor));
+      }
+    }
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly
+   */
+  protected void validate() {
+    super.validate();
+    if (sideInputs != null || sideInputsProcessor != null) {
+      Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
+          String.format("Invalid side input configuration for table: %s. " +
+              "Both side inputs and the processor must be provided", tableId));
+    }
+    if (!enableChangelog) {
+      Preconditions.checkState(changelogStream == null,
+          String.format("Invalid changelog configuration for table: %s. Changelog " +
+              "must be enabled, when changelog stream name is provided", tableId));
+      Preconditions.checkState(changelogReplicationFactor == null,
+          String.format("Invalid changelog configuration for table: %s. Changelog " +
+              "must be enabled, when changelog replication factor is provided", tableId));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java
new file mode 100644
index 0000000..f5dd71b
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/descriptors/BaseLocalStoreBackedTableProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.descriptors;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.storage.kv.LocalStoreBackedReadWriteTable;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+import org.apache.samza.table.utils.SerdeUtils;
+
+
+/**
+ * Base class for tables backed by Samza local stores. The backing stores are
+ * injected during initialization of the table. Since the lifecycle
+ * of the underlying stores are already managed by Samza container,
+ * the table provider will not manage the lifecycle of the backing
+ * stores.
+ */
+abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvider {
+  public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+");
+
+  protected KeyValueStore kvStore;
+
+  public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public void init(Context context) {
+    super.init(context);
+
+    Preconditions.checkNotNull(this.context, "Must specify context for local tables.");
+
+    kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId());
+
+    if (kvStore == null) {
+      throw new SamzaException(String.format(
+          "Backing store for table %s was not injected by SamzaContainer", tableSpec.getId()));
+    }
+
+    logger.info("Initialized backing store for table " + tableSpec.getId());
+  }
+
+  @Override
+  public Table getTable() {
+    if (kvStore == null) {
+      throw new SamzaException("Store not initialized for table " + tableSpec.getId());
+    }
+    ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
+    table.init(this.context);
+    return table;
+  }
+
+  protected Map<String, String> generateCommonStoreConfig(Config jobConfig, Map<String, String> generatedConfig) {
+
+    Map<String, String> storeConfig = new HashMap<>();
+
+    // serde configurations for tables are generated at top level by JobNodeConfigurationGenerator and are included
+    // in the global jobConfig. generatedConfig has all table specific configuration generated from TableSpec, such
+    // as TableProviderFactory, sideInputs, etc.
+    // Merge the global jobConfig and generatedConfig to get full access to configuration needed to create local
+    // store configuration
+    Map<String, String> mergedConfigMap = new HashMap<>(jobConfig);
+    mergedConfigMap.putAll(generatedConfig);
+    JobConfig mergedJobConfig = new JobConfig(new MapConfig(mergedConfigMap));
+    JavaTableConfig tableConfig = new JavaTableConfig(mergedJobConfig);
+
+    String keySerde = tableConfig.getKeySerde(tableSpec.getId());
+    storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);
+
+    String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
+    storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
+
+    List<String> sideInputs = tableSpec.getSideInputs();
+    if (sideInputs != null && !sideInputs.isEmpty()) {
+      sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
+          "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
+      String formattedSideInputs = String.join(",", sideInputs);
+      storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs);
+      storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()),
+          SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor()));
+    }
+
+    // Changelog configuration
+    Boolean enableChangelog = Boolean.valueOf(
+        tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_ENABLE_CHANGELOG));
+    if (enableChangelog) {
+      String changelogStream = tableSpec.getConfig().get(BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_STREAM);
+      if (StringUtils.isEmpty(changelogStream)) {
+        changelogStream = String.format("%s-%s-table-%s", mergedJobConfig.getName().get(), mergedJobConfig.getJobId(),
+            tableSpec.getId());
+      }
+
+      Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format(
+          "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN));
+      storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), tableSpec.getId()), changelogStream);
+
+      String changelogReplicationFactor = tableSpec.getConfig().get(
+          BaseLocalStoreBackedTableDescriptor.INTERNAL_CHANGELOG_REPLICATION_FACTOR);
+      if (changelogReplicationFactor != null) {
+        storeConfig.put(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), tableSpec.getId()),
+            changelogReplicationFactor);
+      }
+    }
+
+    return storeConfig;
+  }
+
+  @Override
+  public void close() {
+    logger.info("Shutting down table provider for table " + tableSpec.getId());
+  }
+
+  private boolean isValidSystemStreamName(String name) {
+    return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches();
+  }
+}