You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/08 22:04:39 UTC
[3/4] samza git commit: SAMZA-1981: Consolidate table descriptors to
samza-api
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
new file mode 100644
index 0000000..6fa040b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for all table provider implementations.
+ */
+abstract public class BaseTableProvider implements TableProvider {
+
+ final protected Logger logger = LoggerFactory.getLogger(getClass());
+
+ final protected TableSpec tableSpec;
+
+ protected Context context;
+
+ public BaseTableProvider(TableSpec tableSpec) {
+ this.tableSpec = tableSpec;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(Context context) {
+ this.context = context;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
+ Map<String, String> tableConfig = new HashMap<>();
+
+ // Insert table_id prefix to config entries
+ tableSpec.getConfig().forEach((k, v) -> {
+ String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+ tableConfig.put(realKey, v);
+ });
+
+ logger.info("Generated configuration for table " + tableSpec.getId());
+
+ return tableConfig;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
index 05454a6..55db637 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -27,8 +27,6 @@ import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.descriptors.TableProvider;
-import org.apache.samza.table.descriptors.TableProviderFactory;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.TableImpl;
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index 1d1e8b1..d7b15a4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -23,8 +23,6 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.context.Context;
-import org.apache.samza.table.descriptors.TableProvider;
-import org.apache.samza.table.descriptors.TableProviderFactory;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.util.Util;
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
new file mode 100644
index 0000000..1a400a4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
+import org.apache.samza.table.caching.guava.GuavaCacheTable;
+import org.apache.samza.table.BaseTableProvider;
+
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Table provider for {@link CachingTable}.
+ */
+public class CachingTableProvider extends BaseTableProvider {
+
+ // Store the cache instances created by default
+ private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
+
+ public CachingTableProvider(TableSpec tableSpec) {
+ super(tableSpec);
+ }
+
+ @Override
+ public Table getTable() {
+ String realTableId = tableSpec.getConfig().get(CachingTableDescriptor.REAL_TABLE_ID);
+ ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
+
+ String cacheTableId = tableSpec.getConfig().get(CachingTableDescriptor.CACHE_TABLE_ID);
+ ReadWriteTable cache;
+
+ if (cacheTableId != null) {
+ cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
+ } else {
+ cache = createDefaultCacheTable(realTableId);
+ defaultCaches.add(cache);
+ }
+
+ boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(CachingTableDescriptor.WRITE_AROUND));
+ CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
+ cachingTable.init(this.context);
+ return cachingTable;
+ }
+
+ @Override
+ public void close() {
+ defaultCaches.forEach(c -> c.close());
+ }
+
+ private ReadWriteTable createDefaultCacheTable(String tableId) {
+ long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.READ_TTL_MS, "-1"));
+ long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.WRITE_TTL_MS, "-1"));
+ long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CachingTableDescriptor.CACHE_SIZE, "-1"));
+
+ CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
+ if (readTtlMs != -1) {
+ cacheBuilder.expireAfterAccess(readTtlMs, TimeUnit.MILLISECONDS);
+ }
+ if (writeTtlMs != -1) {
+ cacheBuilder.expireAfterWrite(writeTtlMs, TimeUnit.MILLISECONDS);
+ }
+ if (cacheSize != -1) {
+ cacheBuilder.maximumSize(cacheSize);
+ }
+
+ logger.info(String.format("Creating default cache with: readTtl=%d, writeTtl=%d, maxSize=%d",
+ readTtlMs, writeTtlMs, cacheSize));
+
+ GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build());
+ cacheTable.init(this.context);
+
+ return cacheTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
new file mode 100644
index 0000000..2ac3694
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProviderFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+/**
+ * Table provider factory for {@link org.apache.samza.table.caching.CachingTable}.
+ */
+public class CachingTableProviderFactory implements TableProviderFactory {
+ @Override
+ public TableProvider getTableProvider(TableSpec tableSpec) {
+ return new CachingTableProvider(tableSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
deleted file mode 100644
index a256afc..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.descriptors;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/**
- * Table descriptor for {@link org.apache.samza.table.caching.CachingTable}.
- * @param <K> type of the key in the cache
- * @param <V> type of the value in the cache
- */
-public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
- private Duration readTtl;
- private Duration writeTtl;
- private long cacheSize;
- private TableDescriptor<K, V, ?> cache;
- private TableDescriptor<K, V, ?> table;
- private boolean isWriteAround;
-
- /**
- * Constructs a table descriptor instance with internal cache
- *
- * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
- * @param table target table descriptor
- */
- public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) {
- super(tableId);
- this.table = table;
- }
-
- /**
- * Constructs a table descriptor instance and specify a cache (as Table descriptor)
- * to be used for caching. Cache get is not synchronized with put for better parallelism
- * in the read path of {@link org.apache.samza.table.caching.CachingTable}. As such, cache table implementation is
- * expected to be thread-safe for concurrent accesses.
- *
- * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ }
- * @param table target table descriptor
- * @param cache cache table descriptor
- */
- public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table,
- TableDescriptor<K, V, ?> cache) {
- this(tableId, table);
- this.cache = cache;
- }
-
- @Override
- public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
- return cache != null
- ? Arrays.asList(cache, table)
- : Arrays.asList(table);
- }
-
- @Override
- public TableSpec getTableSpec() {
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- if (cache != null) {
- tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
- } else {
- if (readTtl != null) {
- tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis()));
- }
- if (writeTtl != null) {
- tableSpecConfig.put(CachingTableProvider.WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
- }
- if (cacheSize > 0) {
- tableSpecConfig.put(CachingTableProvider.CACHE_SIZE, String.valueOf(cacheSize));
- }
- }
-
- tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
- tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround));
-
- return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
- }
-
- /**
- * Specify the TTL for each read access, ie. record is expired after
- * the TTL duration since last read access of each key.
- * @param readTtl read TTL
- * @return this descriptor
- */
- public CachingTableDescriptor<K, V> withReadTtl(Duration readTtl) {
- this.readTtl = readTtl;
- return this;
- }
-
- /**
- * Specify the TTL for each write access, ie. record is expired after
- * the TTL duration since last write access of each key.
- * @param writeTtl write TTL
- * @return this descriptor
- */
- public CachingTableDescriptor<K, V> withWriteTtl(Duration writeTtl) {
- this.writeTtl = writeTtl;
- return this;
- }
-
- /**
- * Specify the max cache size for size-based eviction.
- * @param cacheSize max size of the cache
- * @return this descriptor
- */
- public CachingTableDescriptor<K, V> withCacheSize(long cacheSize) {
- this.cacheSize = cacheSize;
- return this;
- }
-
- /**
- * Specify if write-around policy should be used to bypass writing
- * to cache for put operations. This is useful when put() is the
- * dominant operation and get() has no locality with recent puts.
- * @return this descriptor
- */
- public CachingTableDescriptor<K, V> withWriteAround() {
- this.isWriteAround = true;
- return this;
- }
-
- @Override
- @VisibleForTesting
- public void validate() {
- super.validate();
- Preconditions.checkNotNull(table, "Actual table is required.");
- if (cache == null) {
- Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
- } else {
- Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
- "Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java
deleted file mode 100644
index 007f372..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProvider.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.descriptors;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.caching.CachingTable;
-import org.apache.samza.table.caching.guava.GuavaCacheTable;
-import org.apache.samza.table.utils.descriptors.BaseTableProvider;
-
-import com.google.common.cache.CacheBuilder;
-
-/**
- * Table provider for {@link CachingTable}.
- */
-public class CachingTableProvider extends BaseTableProvider {
-
- public static final String REAL_TABLE_ID = "realTableId";
- public static final String CACHE_TABLE_ID = "cacheTableId";
- public static final String READ_TTL_MS = "readTtl";
- public static final String WRITE_TTL_MS = "writeTtl";
- public static final String CACHE_SIZE = "cacheSize";
- public static final String WRITE_AROUND = "writeAround";
-
- // Store the cache instances created by default
- private final List<ReadWriteTable> defaultCaches = new ArrayList<>();
-
- public CachingTableProvider(TableSpec tableSpec) {
- super(tableSpec);
- }
-
- @Override
- public Table getTable() {
- String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID);
- ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
-
- String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID);
- ReadWriteTable cache;
-
- if (cacheTableId != null) {
- cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
- } else {
- cache = createDefaultCacheTable(realTableId);
- defaultCaches.add(cache);
- }
-
- boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND));
- CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
- cachingTable.init(this.context);
- return cachingTable;
- }
-
- @Override
- public void close() {
- defaultCaches.forEach(c -> c.close());
- }
-
- private ReadWriteTable createDefaultCacheTable(String tableId) {
- long readTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(READ_TTL_MS, "-1"));
- long writeTtlMs = Long.parseLong(tableSpec.getConfig().getOrDefault(WRITE_TTL_MS, "-1"));
- long cacheSize = Long.parseLong(tableSpec.getConfig().getOrDefault(CACHE_SIZE, "-1"));
-
- CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
- if (readTtlMs != -1) {
- cacheBuilder.expireAfterAccess(readTtlMs, TimeUnit.MILLISECONDS);
- }
- if (writeTtlMs != -1) {
- cacheBuilder.expireAfterWrite(writeTtlMs, TimeUnit.MILLISECONDS);
- }
- if (cacheSize != -1) {
- cacheBuilder.maximumSize(cacheSize);
- }
-
- logger.info(String.format("Creating default cache with: readTtl=%d, writeTtl=%d, maxSize=%d",
- readTtlMs, writeTtlMs, cacheSize));
-
- GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build());
- cacheTable.init(this.context);
-
- return cacheTable;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java
deleted file mode 100644
index 68eb162..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableProviderFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.descriptors;
-
-import org.apache.samza.table.descriptors.TableProvider;
-import org.apache.samza.table.descriptors.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-/**
- * Table provider factory for {@link org.apache.samza.table.caching.CachingTable}.
- */
-public class CachingTableProviderFactory implements TableProviderFactory {
- @Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- return new CachingTableProvider(tableSpec);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
new file mode 100644
index 0000000..5c9b2af
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.guava;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.BaseTableProvider;
+import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
+import org.apache.samza.table.utils.SerdeUtils;
+
+import com.google.common.cache.Cache;
+
+
+/**
+ * Table provider for {@link GuavaCacheTable}.
+ */
+public class GuavaCacheTableProvider extends BaseTableProvider {
+
+ private List<GuavaCacheTable> guavaTables = new ArrayList<>();
+
+ public GuavaCacheTableProvider(TableSpec tableSpec) {
+ super(tableSpec);
+ }
+
+ @Override
+ public Table getTable() {
+ Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,
+ tableSpec.getConfig().get(GuavaCacheTableDescriptor.GUAVA_CACHE));
+ GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
+ table.init(this.context);
+ guavaTables.add(table);
+ return table;
+ }
+
+ @Override
+ public void close() {
+ guavaTables.forEach(t -> t.close());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
new file mode 100644
index 0000000..ac060c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProviderFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.caching.guava;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+/**
+ * Table provider factory for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
+ */
+public class GuavaCacheTableProviderFactory implements TableProviderFactory {
+ @Override
+ public TableProvider getTableProvider(TableSpec tableSpec) {
+ return new GuavaCacheTableProvider(tableSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java
deleted file mode 100644
index e0224bb..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.guava.descriptors;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.utils.SerdeUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-
-
-/**
- * Table descriptor for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
- * @param <K> type of the key in the cache
- * @param <V> type of the value in the cache
- */
-public class GuavaCacheTableDescriptor<K, V> extends BaseTableDescriptor<K, V, GuavaCacheTableDescriptor<K, V>> {
- private Cache<K, V> cache;
-
- /**
- * {@inheritDoc}
- */
- public GuavaCacheTableDescriptor(String tableId) {
- super(tableId);
- }
-
- @Override
- public TableSpec getTableSpec() {
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- tableSpecConfig.put(GuavaCacheTableProvider.GUAVA_CACHE, SerdeUtils.serialize("Guava cache", cache));
-
- return new TableSpec(tableId, serde, GuavaCacheTableProviderFactory.class.getName(), tableSpecConfig);
- }
-
- /**
- * Specify a pre-configured Guava cache instance to be used for caching table.
- * @param cache Guava cache instance
- * @return this descriptor
- */
- public GuavaCacheTableDescriptor withCache(Cache<K, V> cache) {
- this.cache = cache;
- return this;
- }
-
- @Override
- protected void validate() {
- super.validate();
- Preconditions.checkArgument(cache != null, "Must provide a Guava cache instance.");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java
deleted file mode 100644
index 45d1fdc..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProvider.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.guava.descriptors;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.caching.guava.GuavaCacheTable;
-import org.apache.samza.table.utils.descriptors.BaseTableProvider;
-import org.apache.samza.table.utils.SerdeUtils;
-
-import com.google.common.cache.Cache;
-
-
-/**
- * Table provider for {@link GuavaCacheTable}.
- */
-public class GuavaCacheTableProvider extends BaseTableProvider {
-
- public static final String GUAVA_CACHE = "guavaCache";
-
- private List<GuavaCacheTable> guavaTables = new ArrayList<>();
-
- public GuavaCacheTableProvider(TableSpec tableSpec) {
- super(tableSpec);
- }
-
- @Override
- public Table getTable() {
- Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE));
- GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
- table.init(this.context);
- guavaTables.add(table);
- return table;
- }
-
- @Override
- public void close() {
- guavaTables.forEach(t -> t.close());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java
deleted file mode 100644
index 01228cc..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/descriptors/GuavaCacheTableProviderFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.caching.guava.descriptors;
-
-import org.apache.samza.table.descriptors.TableProvider;
-import org.apache.samza.table.descriptors.TableProviderFactory;
-import org.apache.samza.table.TableSpec;
-
-/**
- * Table provider factory for {@link org.apache.samza.table.caching.guava.GuavaCacheTable}.
- */
-public class GuavaCacheTableProviderFactory implements TableProviderFactory {
- @Override
- public TableProvider getTableProvider(TableSpec tableSpec) {
- return new GuavaCacheTableProvider(tableSpec);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java
deleted file mode 100644
index 15486c7..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseHybridTableDescriptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.descriptors;
-
-import java.util.List;
-
-/**
- * Base class for hybrid table descriptors. A hybrid table consists of one or more
- * table descriptors, and it orchestrates operations between them to achieve more advanced
- * functionality.
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- * @param <D> the type of this table descriptor
- */
-abstract public class BaseHybridTableDescriptor<K, V, D extends BaseHybridTableDescriptor<K, V, D>>
- extends BaseTableDescriptor<K, V, D> {
-
- /**
- * {@inheritDoc}
- */
- public BaseHybridTableDescriptor(String tableId) {
- super(tableId);
- }
-
- /**
- * Get tables contained within this table.
- * @return list of tables
- */
- abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors();
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
deleted file mode 100644
index 246216b..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.descriptors;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Base class for all table descriptor implementations.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
- implements TableDescriptor<K, V, D> {
-
- protected final String tableId;
-
- protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
-
- protected final Map<String, String> config = new HashMap<>();
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- */
- protected BaseTableDescriptor(String tableId) {
- this.tableId = tableId;
- }
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
- this.tableId = tableId;
- this.serde = serde;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public D withConfig(String key, String value) {
- config.put(key, value);
- return (D) this;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getTableId() {
- return tableId;
- }
-
- /**
- * Get the serde assigned to this {@link TableDescriptor}
- *
- * @return {@link KVSerde} used by this table
- */
- public KVSerde<K, V> getSerde() {
- return serde;
- }
-
- /**
- * Generate config for {@link TableSpec}; this method is used internally.
- * @param tableSpecConfig configuration for the {@link TableSpec}
- */
- protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
- tableSpecConfig.putAll(config);
- }
-
- /**
- * Validate that this table descriptor is constructed properly; this method is used internally.
- */
- protected void validate() {
- }
-
- /**
- * Create a {@link TableSpec} from this table descriptor; this method is used internally.
- *
- * @return the {@link TableSpec}
- */
- abstract public TableSpec getTableSpec();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
new file mode 100644
index 0000000..4b17e23
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+import org.apache.samza.table.BaseTableProvider;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.util.RateLimiter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+/**
+ * Provide for remote table instances
+ */
+public class RemoteTableProvider extends BaseTableProvider {
+
+
+ private final boolean readOnly;
+ private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
+
+ /**
+ * Map of tableId -> executor service for async table IO and callbacks. The same executors
+ * are shared by both read/write operations such that tables of the same tableId all share
+ * the set same of executors globally whereas table itself is per-task.
+ */
+ private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
+ private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+ private static ScheduledExecutorService retryExecutor;
+
+ public RemoteTableProvider(TableSpec tableSpec) {
+ super(tableSpec);
+ this.readOnly = !tableSpec.getConfig().containsKey(RemoteTableDescriptor.WRITE_FN);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Table getTable() {
+ RemoteReadableTable table;
+ String tableId = tableSpec.getId();
+
+ TableReadFunction readFn = getReadFn();
+ RateLimiter rateLimiter = deserializeObject(RemoteTableDescriptor.RATE_LIMITER);
+ if (rateLimiter != null) {
+ rateLimiter.init(this.context);
+ }
+ TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(RemoteTableDescriptor.READ_CREDIT_FN);
+ TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
+
+ TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
+ TableRateLimiter writeRateLimiter = null;
+
+ TableRetryPolicy readRetryPolicy = deserializeObject(RemoteTableDescriptor.READ_RETRY_POLICY);
+ TableRetryPolicy writeRetryPolicy = null;
+
+ if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
+ retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-retry-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ if (readRetryPolicy != null) {
+ readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
+ }
+
+ TableWriteFunction writeFn = getWriteFn();
+
+ boolean isRateLimited = readRateLimiter.isRateLimited();
+ if (!readOnly) {
+ writeCreditFn = deserializeObject(RemoteTableDescriptor.WRITE_CREDIT_FN);
+ writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
+ isRateLimited |= writeRateLimiter.isRateLimited();
+ writeRetryPolicy = deserializeObject(RemoteTableDescriptor.WRITE_RETRY_POLICY);
+ if (writeRetryPolicy != null) {
+ writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
+ }
+ }
+
+ // Optional executor for future callback/completion. Shared by both read and write operations.
+ int callbackPoolSize = Integer.parseInt(tableSpec.getConfig().get(RemoteTableDescriptor.ASYNC_CALLBACK_POOL_SIZE));
+ if (callbackPoolSize > 0) {
+ callbackExecutors.computeIfAbsent(tableId, (arg) ->
+ Executors.newFixedThreadPool(callbackPoolSize, (runnable) -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-" + tableId + "-async-callback-pool");
+ thread.setDaemon(true);
+ return thread;
+ }));
+ }
+
+ if (isRateLimited) {
+ tableExecutors.computeIfAbsent(tableId, (arg) ->
+ Executors.newSingleThreadExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-" + tableId + "-async-executor");
+ thread.setDaemon(true);
+ return thread;
+ }));
+ }
+
+ if (readOnly) {
+ table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
+ tableExecutors.get(tableId), callbackExecutors.get(tableId));
+ } else {
+ table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
+ writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
+ }
+
+ TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
+ if (readRetryPolicy != null) {
+ ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
+ }
+ if (writeRetryPolicy != null) {
+ ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
+ }
+
+ table.init(this.context);
+ tables.add(table);
+ return table;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ tables.forEach(t -> t.close());
+ tableExecutors.values().forEach(e -> e.shutdown());
+ callbackExecutors.values().forEach(e -> e.shutdown());
+ }
+
+ private <T> T deserializeObject(String key) {
+ String entry = tableSpec.getConfig().getOrDefault(key, "");
+ if (entry.isEmpty()) {
+ return null;
+ }
+ return SerdeUtils.deserialize(key, entry);
+ }
+
+ private TableReadFunction<?, ?> getReadFn() {
+ TableReadFunction<?, ?> readFn = deserializeObject(RemoteTableDescriptor.READ_FN);
+ if (readFn != null) {
+ readFn.init(this.context);
+ }
+ return readFn;
+ }
+
+ private TableWriteFunction<?, ?> getWriteFn() {
+ TableWriteFunction<?, ?> writeFn = deserializeObject(RemoteTableDescriptor.WRITE_FN);
+ if (writeFn != null) {
+ writeFn.init(this.context);
+ }
+ return writeFn;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
new file mode 100644
index 0000000..0eb88fd
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory class for a remote table provider
+ */
+public class RemoteTableProviderFactory implements TableProviderFactory {
+ @Override
+ public TableProvider getTableProvider(TableSpec tableSpec) {
+ Preconditions.checkNotNull(tableSpec, "null table spec");
+ return new RemoteTableProvider(tableSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
deleted file mode 100644
index c67a648..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.util.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/**
- * Helper class for remote table to throttle table IO requests with the configured rate limiter.
- * For each request, the needed credits are calculated with the configured credit functions.
- * The throttle methods are overloaded to support the possible CRUD operations.
- *
- * @param <K> type of the table key
- * @param <V> type of the table record
- */
-public class TableRateLimiter<K, V> {
- private static final Logger LOG = LoggerFactory.getLogger(TableRateLimiter.class);
-
- private final String tag;
- private final boolean rateLimited;
- private final CreditFunction<K, V> creditFn;
-
- @VisibleForTesting
- final RateLimiter rateLimiter;
-
- private Timer waitTimeMetric;
-
- /**
- * Function interface for providing rate limiting credits for each table record.
- * This interface allows callers to pass in lambda expressions which are otherwise
- * non-serializable as-is.
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
- @InterfaceStability.Unstable
- public interface CreditFunction<K, V> extends Serializable {
- /**
- * Get the number of credits required for the {@code key} and {@code value} pair.
- * @param key table key
- * @param value table record
- * @return number of credits
- */
- int getCredits(K key, V value);
- }
-
- /**
- * @param tableId table id of the table to be rate limited
- * @param rateLimiter actual rate limiter instance to be used
- * @param creditFn function for deriving the credits for each request
- * @param tag tag to be used with the rate limiter
- */
- public TableRateLimiter(String tableId, RateLimiter rateLimiter, CreditFunction<K, V> creditFn, String tag) {
- this.rateLimiter = rateLimiter;
- this.creditFn = creditFn;
- this.tag = tag;
- this.rateLimited = rateLimiter != null && rateLimiter.getSupportedTags().contains(tag);
- LOG.info("Rate limiting is {} for {}", rateLimited ? "enabled" : "disabled", tableId);
- }
-
- /**
- * Set up waitTimeMetric metric for latency reporting due to throttling.
- * @param timer waitTimeMetric metric
- */
- public void setTimerMetric(Timer timer) {
- Preconditions.checkNotNull(timer);
- this.waitTimeMetric = timer;
- }
-
- int getCredits(K key, V value) {
- return (creditFn == null) ? 1 : creditFn.getCredits(key, value);
- }
-
- int getCredits(Collection<K> keys) {
- if (creditFn == null) {
- return keys.size();
- } else {
- return keys.stream().mapToInt(k -> creditFn.getCredits(k, null)).sum();
- }
- }
-
- int getEntryCredits(Collection<Entry<K, V>> entries) {
- if (creditFn == null) {
- return entries.size();
- } else {
- return entries.stream().mapToInt(e -> creditFn.getCredits(e.getKey(), e.getValue())).sum();
- }
- }
-
- private void throttle(int credits) {
- if (!rateLimited) {
- return;
- }
-
- long startNs = System.nanoTime();
- rateLimiter.acquire(Collections.singletonMap(tag, credits));
- waitTimeMetric.update(System.nanoTime() - startNs);
- }
-
- /**
- * Throttle a request with a key argument if necessary.
- * @param key key used for the table request
- */
- public void throttle(K key) {
- throttle(getCredits(key, null));
- }
-
- /**
- * Throttle a request with both the key and value arguments if necessary.
- * @param key key used for the table request
- * @param value value used for the table request
- */
- public void throttle(K key, V value) {
- throttle(getCredits(key, value));
- }
-
- /**
- * Throttle a request with a collection of keys as the argument if necessary.
- * @param keys collection of keys used for the table request
- */
- public void throttle(Collection<K> keys) {
- throttle(getCredits(keys));
- }
-
- /**
- * Throttle a request with a collection of table records as the argument if necessary.
- * @param records collection of records used for the table request
- */
- public void throttleRecords(Collection<Entry<K, V>> records) {
- throttle(getEntryCredits(records));
- }
-
- /**
- * @return whether rate limiting is enabled for the associated table
- */
- public boolean isRateLimited() {
- return rateLimited;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
deleted file mode 100644
index 4791779..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.ClosableFunction;
-import org.apache.samza.operators.functions.InitableFunction;
-
-import com.google.common.collect.Iterables;
-
-
-/**
- * A function object to be used with a {@link RemoteReadableTable} implementation. It encapsulates the functionality
- * of reading table record(s) for a provided set of key(s).
- *
- * <p> Instances of {@link TableReadFunction} are meant to be serializable. ie. any non-serializable state
- * (eg: network sockets) should be marked as transient and recreated inside readObject().
- *
- * <p> Implementations are expected to be thread-safe.
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-@InterfaceStability.Unstable
-public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
- /**
- * Fetch single table record for a specified {@code key}. This method must be thread-safe.
- * The default implementation calls getAsync and blocks on the completion afterwards.
- * @param key key for the table record
- * @return table record for the specified {@code key}
- */
- default V get(K key) {
- try {
- return getAsync(key).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new SamzaException("GET failed for " + key, e);
- }
- }
-
- /**
- * Asynchronously fetch single table record for a specified {@code key}. This method must be thread-safe.
- * @param key key for the table record
- * @return CompletableFuture for the get request
- */
- CompletableFuture<V> getAsync(K key);
-
- /**
- * Fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
- * The default implementation calls getAllAsync and blocks on the completion afterwards.
- * @param keys keys for the table records
- * @return all records for the specified keys.
- */
- default Map<K, V> getAll(Collection<K> keys) {
- try {
- return getAllAsync(keys).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new SamzaException("GET_ALL failed for " + keys, e);
- }
- }
-
- /**
- * Asynchronously fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
- * The default implementation calls getAsync for each key and return a combined future.
- * @param keys keys for the table records
- * @return CompletableFuture for the get request
- */
- default CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
- Map<K, CompletableFuture<V>> getFutures = keys.stream().collect(
- Collectors.toMap(k -> k, k -> getAsync(k)));
-
- return CompletableFuture.allOf(
- Iterables.toArray(getFutures.values(), CompletableFuture.class))
- .thenApply(future ->
- getFutures.entrySet()
- .stream()
- .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join())));
- }
-
- /**
- * Determine whether the current operation can be retried with the last thrown exception.
- * @param exception exception thrown by a table operation
- * @return whether the operation can be retried
- */
- boolean isRetriable(Throwable exception);
-
- // optionally implement readObject() to initialize transient states
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
deleted file mode 100644
index d9d619f..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.ClosableFunction;
-import org.apache.samza.operators.functions.InitableFunction;
-import org.apache.samza.storage.kv.Entry;
-
-import com.google.common.collect.Iterables;
-
-
-/**
- * A function object to be used with a {@link RemoteReadWriteTable} implementation. It encapsulates the functionality
- * of writing table record(s) for a provided set of key(s) to the store.
- *
- * <p> Instances of {@link TableWriteFunction} are meant to be serializable. ie. any non-serializable state
- * (eg: network sockets) should be marked as transient and recreated inside readObject().
- *
- * <p> Implementations are expected to be thread-safe.
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-@InterfaceStability.Unstable
-public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
- /**
- * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
- * The default implementation calls putAsync and blocks on the completion afterwards.
- *
- * @param key key for the table record
- * @param record table record to be written
- */
- default void put(K key, V record) {
- try {
- putAsync(key, record).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new SamzaException("PUT failed for " + key, e);
- }
- }
-
- /**
- * Asynchronously store single table {@code record} with specified {@code key}. This method must be thread-safe.
- * @param key key for the table record
- * @param record table record to be written
- * @return CompletableFuture for the put request
- */
- CompletableFuture<Void> putAsync(K key, V record);
-
- /**
- * Store the table {@code records} with specified {@code keys}. This method must be thread-safe.
- * The default implementation calls putAllAsync and blocks on the completion afterwards.
- * @param records table records to be written
- */
- default void putAll(List<Entry<K, V>> records) {
- try {
- putAllAsync(records).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new SamzaException("PUT_ALL failed for " + records, e);
- }
- }
-
- /**
- * Asynchronously store the table {@code records} with specified {@code keys}. This method must be thread-safe.
- * The default implementation calls putAsync for each entry and return a combined future.
- * @param records table records to be written
- * @return CompletableFuture for the put request
- */
- default CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) {
- List<CompletableFuture<Void>> putFutures =
- records.stream().map(e -> putAsync(e.getKey(), e.getValue())).collect(Collectors.toList());
- return CompletableFuture.allOf(Iterables.toArray(putFutures, CompletableFuture.class));
- }
-
- /**
- * Delete the {@code record} with specified {@code key} from the remote store.
- * The default implementation calls deleteAsync and blocks on the completion afterwards.
- * @param key key to the table record to be deleted
- */
- default void delete(K key) {
- try {
- deleteAsync(key).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new SamzaException("DELETE failed for " + key, e);
- }
- }
-
- /**
- * Asynchronously delete the {@code record} with specified {@code key} from the remote store
- * @param key key to the table record to be deleted
- * @return CompletableFuture for the delete request
- */
- CompletableFuture<Void> deleteAsync(K key);
-
- /**
- * Delete all {@code records} with the specified {@code keys} from the remote store
- * The default implementation calls deleteAllAsync and blocks on the completion afterwards.
- * @param keys keys for the table records to be written
- */
- default void deleteAll(Collection<K> keys) {
- try {
- deleteAllAsync(keys).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new SamzaException("DELETE failed for " + keys, e);
- }
- }
-
- /**
- * Asynchronously delete all {@code records} with the specified {@code keys} from the remote store.
- * The default implementation calls deleteAsync for each key and return a combined future.
- *
- * @param keys keys for the table records to be written
- * @return CompletableFuture for the deleteAll request
- */
- default CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
- List<CompletableFuture<Void>> deleteFutures =
- keys.stream().map(this::deleteAsync).collect(Collectors.toList());
- return CompletableFuture.allOf(Iterables.toArray(deleteFutures, CompletableFuture.class));
- }
-
- /**
- * Determine whether the current operation can be retried with the last thrown exception.
- * @param exception exception thrown by a table operation
- * @return whether the operation can be retried
- */
- boolean isRetriable(Throwable exception);
-
- /**
- * Flush the remote store (optional)
- */
- default void flush() {
- }
-
- // optionally implement readObject() to initialize transient states
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/3da75e61/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
deleted file mode 100644
index 1d1bca6..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/descriptors/RemoteTableDescriptor.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote.descriptors;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.remote.TableRateLimiter;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.table.utils.SerdeUtils;
-import org.apache.samza.util.EmbeddedTaggedRateLimiter;
-import org.apache.samza.util.RateLimiter;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * Table descriptor for remote store backed tables
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
- /**
- * Tag to be used for provision credits for rate limiting read operations from the remote table.
- * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
- * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
- * TableRateLimiter.CreditFunction)}
- */
- public static final String RL_READ_TAG = "readTag";
-
- /**
- * Tag to be used for provision credits for rate limiting write operations into the remote table.
- * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
- * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
- * TableRateLimiter.CreditFunction)} and it needs the write functionality.
- */
- public static final String RL_WRITE_TAG = "writeTag";
-
- // Input support for a specific remote store (required)
- private TableReadFunction<K, V> readFn;
-
- // Output support for a specific remote store (optional)
- private TableWriteFunction<K, V> writeFn;
-
- // Rate limiter for client-side throttling; it is set by withRateLimiter()
- private RateLimiter rateLimiter;
-
- // Rates for constructing the default rate limiter when they are non-zero
- private Map<String, Integer> tagCreditsMap = new HashMap<>();
-
- private TableRateLimiter.CreditFunction<K, V> readCreditFn;
- private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
-
- private TableRetryPolicy readRetryPolicy;
- private TableRetryPolicy writeRetryPolicy;
-
- // By default execute future callbacks on the native client threads
- // ie. no additional thread pool for callbacks.
- private int asyncCallbackPoolSize = -1;
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- */
- public RemoteTableDescriptor(String tableId) {
- super(tableId);
- }
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
- super(tableId, serde);
- }
-
- @Override
- public TableSpec getTableSpec() {
- validate();
-
- Map<String, String> tableSpecConfig = new HashMap<>();
- generateTableSpecConfig(tableSpecConfig);
-
- // Serialize and store reader/writer functions
- tableSpecConfig.put(RemoteTableProvider.READ_FN, SerdeUtils.serialize("read function", readFn));
-
- if (writeFn != null) {
- tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
- }
-
- if (!tagCreditsMap.isEmpty()) {
- tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter",
- new EmbeddedTaggedRateLimiter(tagCreditsMap)));
- } else if (rateLimiter != null) {
- tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
- }
-
- // Serialize the readCredit functions
- if (readCreditFn != null) {
- tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
- "read credit function", readCreditFn));
- }
- // Serialize the writeCredit functions
- if (writeCreditFn != null) {
- tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
- "write credit function", writeCreditFn));
- }
-
- if (readRetryPolicy != null) {
- tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
- "read retry policy", readRetryPolicy));
- }
-
- if (writeRetryPolicy != null) {
- tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
- "write retry policy", writeRetryPolicy));
- }
-
- tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
-
- return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
- }
-
- /**
- * Use specified TableReadFunction with remote table and a retry policy.
- * @param readFn read function instance
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
- Preconditions.checkNotNull(readFn, "null read function");
- this.readFn = readFn;
- return this;
- }
-
- /**
- * Use specified TableWriteFunction with remote table and a retry policy.
- * @param writeFn write function instance
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
- Preconditions.checkNotNull(writeFn, "null write function");
- this.writeFn = writeFn;
- return this;
- }
-
- /**
- * Use specified TableReadFunction with remote table.
- * @param readFn read function instance
- * @param retryPolicy retry policy for the read function
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
- Preconditions.checkNotNull(readFn, "null read function");
- Preconditions.checkNotNull(retryPolicy, "null retry policy");
- this.readFn = readFn;
- this.readRetryPolicy = retryPolicy;
- return this;
- }
-
- /**
- * Use specified TableWriteFunction with remote table.
- * @param writeFn write function instance
- * @param retryPolicy retry policy for the write function
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
- Preconditions.checkNotNull(writeFn, "null write function");
- Preconditions.checkNotNull(retryPolicy, "null retry policy");
- this.writeFn = writeFn;
- this.writeRetryPolicy = retryPolicy;
- return this;
- }
-
- /**
- * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
- * of credits to be charged from the rate limiter for table read and write operations.
- * This is an advanced API that provides greater flexibility to throttle each record in the table
- * with different number of credits. For most common use-cases eg: limit the number of read/write
- * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
- * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
- *
- * @param rateLimiter rate limiter instance to be used for throttling
- * @param readCreditFn credit function for rate limiting read operations
- * @param writeCreditFn credit function for rate limiting write operations
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
- TableRateLimiter.CreditFunction<K, V> readCreditFn,
- TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
- Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
- this.rateLimiter = rateLimiter;
- this.readCreditFn = readCreditFn;
- this.writeCreditFn = writeCreditFn;
- return this;
- }
-
- /**
- * Specify the rate limit for table read operations. If the read rate limit is set with this method
- * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
- * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
- * and vice versa.
- * @param creditsPerSec rate limit for read operations; must be positive
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
- Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
- tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
- return this;
- }
-
- /**
- * Specify the rate limit for table write operations. If the write rate limit is set with this method
- * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
- * TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
- * and vice versa.
- * @param creditsPerSec rate limit for write operations; must be positive
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
- Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
- tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
- return this;
- }
-
- /**
- * Specify the size of the thread pool for the executor used to execute
- * callbacks of CompletableFutures of async Table operations. By default, these
- * futures are completed (called) by the threads of the native store client. Depending
- * on the implementation of the native client, it may or may not allow executing long
- * running operations in the callbacks. This config can be used to execute the callbacks
- * from a separate executor to decouple from the native client. If configured, this
- * thread pool is shared by all read and write operations.
- * @param poolSize max number of threads in the executor for async callbacks
- * @return this table descriptor instance
- */
- public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
- this.asyncCallbackPoolSize = poolSize;
- return this;
- }
-
- @Override
- protected void validate() {
- super.validate();
- Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
- Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
- "Only one of rateLimiter instance or read/write limits can be specified");
- // Assume callback executor pool should have no more than 20 threads
- Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
- "too many threads for async callback executor.");
- }
-}