You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/09 22:58:00 UTC
[1/2] samza git commit: SAMZA-1610: Implementation of remote table
provider
Repository: samza
Updated Branches:
refs/heads/master 1971d596c -> 2be7061d4
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
index 9c79766..05fa19a 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
@@ -18,14 +18,22 @@
*/
package org.apache.samza.util;
+import java.lang.reflect.Field;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.task.TaskContext;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestEmbeddedTaggedRateLimiter {
@@ -35,42 +43,71 @@ public class TestEmbeddedTaggedRateLimiter {
final static private int TARGET_RATE_RED = 1000;
final static private int TARGET_RATE_PER_TASK_RED = TARGET_RATE_RED / NUMBER_OF_TASKS;
final static private int TARGET_RATE_GREEN = 2000;
- final static private int TARGET_RATE_PER_TASK_GREEN = TARGET_RATE_GREEN / NUMBER_OF_TASKS;
final static private int INCREMENT = 2;
+ final static private int TARGET_RATE = 4000;
+ final static private int TARGET_RATE_PER_TASK = TARGET_RATE / NUMBER_OF_TASKS;
+
@Test
@Ignore("Flaky Test: Test fails in travis.")
public void testAcquire() {
- RateLimiter rateLimiter = createRateLimiter();
+ RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(TARGET_RATE);
+ initRateLimiter(rateLimiter);
- Map<String, Integer> tagToCount = new HashMap<>();
- tagToCount.put("red", 0);
- tagToCount.put("green", 0);
+ int count = 0;
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < TEST_INTERVAL) {
+ rateLimiter.acquire(INCREMENT);
+ count += INCREMENT;
+ }
- Map<String, Integer> tagToCredits = new HashMap<>();
- tagToCredits.put("red", INCREMENT);
- tagToCredits.put("green", INCREMENT);
+ long rate = count * 1000 / TEST_INTERVAL;
+ verifyRate(rate);
+ }
+
+ @Test
+ public void testAcquireWithTimeout() {
+ RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(TARGET_RATE);
+ initRateLimiter(rateLimiter);
+
+ boolean hasSeenZeros = false;
+ int count = 0;
+ int callCount = 0;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TEST_INTERVAL) {
- rateLimiter.acquire(tagToCredits);
- tagToCount.put("red", tagToCount.get("red") + INCREMENT);
- tagToCount.put("green", tagToCount.get("green") + INCREMENT);
+ ++callCount;
+ int availableCredits = rateLimiter.acquire(INCREMENT, 20, MILLISECONDS);
+ if (availableCredits <= 0) {
+ hasSeenZeros = true;
+ } else {
+ count += INCREMENT;
+ }
}
- {
- long rate = tagToCount.get("red") * 1000 / TEST_INTERVAL;
- verifyRate(rate, TARGET_RATE_PER_TASK_RED);
- } {
- // Note: due to blocking, green is capped at red's QPS
- long rate = tagToCount.get("green") * 1000 / TEST_INTERVAL;
- verifyRate(rate, TARGET_RATE_PER_TASK_RED);
- }
+ long rate = count * 1000 / TEST_INTERVAL;
+ verifyRate(rate);
+ junit.framework.Assert.assertTrue(Math.abs(callCount - TARGET_RATE_PER_TASK * TEST_INTERVAL / 1000 / INCREMENT) <= 2);
+ junit.framework.Assert.assertFalse(hasSeenZeros);
}
- @Test
- public void testTryAcquire() {
+ @Test(expected = IllegalStateException.class)
+ public void testFailsWhenUninitialized() {
+ new EmbeddedTaggedRateLimiter(100).acquire(1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailsWhenUsingTags() {
+ RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(10);
+ initRateLimiter(rateLimiter);
+ Map<String, Integer> tagToCredits = new HashMap<>();
+ tagToCredits.put("red", 1);
+ tagToCredits.put("green", 1);
+ rateLimiter.acquire(tagToCredits);
+ }
+ @Test
+ public void testAcquireTagged() {
RateLimiter rateLimiter = createRateLimiter();
Map<String, Integer> tagToCount = new HashMap<>();
@@ -83,22 +120,23 @@ public class TestEmbeddedTaggedRateLimiter {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TEST_INTERVAL) {
- Map<String, Integer> resultMap = rateLimiter.tryAcquire(tagToCredits);
- tagToCount.put("red", tagToCount.get("red") + resultMap.get("red"));
- tagToCount.put("green", tagToCount.get("green") + resultMap.get("green"));
+ rateLimiter.acquire(tagToCredits);
+ tagToCount.put("red", tagToCount.get("red") + INCREMENT);
+ tagToCount.put("green", tagToCount.get("green") + INCREMENT);
}
{
long rate = tagToCount.get("red") * 1000 / TEST_INTERVAL;
verifyRate(rate, TARGET_RATE_PER_TASK_RED);
} {
+ // Note: due to blocking, green is capped at red's QPS
long rate = tagToCount.get("green") * 1000 / TEST_INTERVAL;
- verifyRate(rate, TARGET_RATE_PER_TASK_GREEN);
+ verifyRate(rate, TARGET_RATE_PER_TASK_RED);
}
}
@Test
- public void testAcquireWithTimeout() {
+ public void testAcquireWithTimeoutTagged() {
RateLimiter rateLimiter = createRateLimiter();
@@ -128,7 +166,7 @@ public class TestEmbeddedTaggedRateLimiter {
}
@Test(expected = IllegalStateException.class)
- public void testFailsWhenUninitialized() {
+ public void testFailsWhenUninitializedTagged() {
Map<String, Integer> tagToTargetRateMap = new HashMap<>();
tagToTargetRateMap.put("red", 1000);
tagToTargetRateMap.put("green", 2000);
@@ -141,14 +179,14 @@ public class TestEmbeddedTaggedRateLimiter {
tagToCredits.put("red", 1);
tagToCredits.put("green", 1);
RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(tagToCredits);
- TestEmbeddedRateLimiter.initRateLimiter(rateLimiter);
+ initRateLimiter(rateLimiter);
rateLimiter.acquire(1);
}
private void verifyRate(long rate, long targetRate) {
// As the actual rate would likely not be exactly the same as target rate, the calculation below
- // verifies the actual rate is within 5% of the target rate per task
- Assert.assertTrue(Math.abs(rate - targetRate) <= targetRate * 5 / 100);
+ // verifies the actual rate is within 10% of the target rate per task
+ Assert.assertTrue(Math.abs(rate - targetRate) <= targetRate * 10 / 100);
}
private RateLimiter createRateLimiter() {
@@ -156,8 +194,36 @@ public class TestEmbeddedTaggedRateLimiter {
tagToTargetRateMap.put("red", TARGET_RATE_RED);
tagToTargetRateMap.put("green", TARGET_RATE_GREEN);
RateLimiter rateLimiter = new EmbeddedTaggedRateLimiter(tagToTargetRateMap);
- TestEmbeddedRateLimiter.initRateLimiter(rateLimiter);
+ initRateLimiter(rateLimiter);
return rateLimiter;
}
+ private void verifyRate(long rate) {
+ // As the actual rate would likely not be exactly the same as target rate, the calculation below
+ // verifies the actual rate is within 5% of the target rate per task
+ junit.framework.Assert.assertTrue(Math.abs(rate - TARGET_RATE_PER_TASK) <= TARGET_RATE_PER_TASK * 5 / 100);
+ }
+
+ static void initRateLimiter(RateLimiter rateLimiter) {
+ Config config = mock(Config.class);
+ TaskContext taskContext = mock(TaskContext.class);
+ SamzaContainerContext containerContext = mockSamzaContainerContext();
+ when(taskContext.getSamzaContainerContext()).thenReturn(containerContext);
+ rateLimiter.init(config, taskContext);
+ }
+
+ static SamzaContainerContext mockSamzaContainerContext() {
+ try {
+ Collection<String> taskNames = mock(Collection.class);
+ when(taskNames.size()).thenReturn(NUMBER_OF_TASKS);
+ SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+ Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames");
+ taskNamesField.setAccessible(true);
+ taskNamesField.set(containerContext, taskNames);
+ taskNamesField.setAccessible(false);
+ return containerContext;
+ } catch (Exception ex) {
+ throw new SamzaException(ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index 4af0f1d..b494eba 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -25,18 +25,26 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.storage.StorageEngine;
-import org.apache.samza.table.LocalStoreBackedTableProvider;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
- * Base class for tables backed by Samza stores, see {@link LocalStoreBackedTableProvider}.
+ * Base class for tables backed by Samza local stores. The backing stores are
+ * injected during initialization of the table. Since the lifecycle
+ * of the underlying stores are already managed by Samza container,
+ * the table provider will not manage the lifecycle of the backing
+ * stores.
*/
-abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBackedTableProvider {
+abstract public class BaseLocalStoreBackedTableProvider implements TableProvider {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@@ -44,13 +52,28 @@ abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBac
protected KeyValueStore kvStore;
+ protected SamzaContainerContext containerContext;
+
+ protected TaskContext taskContext;
+
public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
this.tableSpec = tableSpec;
}
@Override
- public void init(StorageEngine store) {
- kvStore = (KeyValueStore) store;
+ public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ this.containerContext = containerContext;
+ this.taskContext = taskContext;
+
+ Preconditions.checkNotNull(this.taskContext, "Must specify task context for local tables.");
+
+ kvStore = (KeyValueStore) taskContext.getStore(tableSpec.getId());
+
+ if (kvStore == null) {
+ throw new SamzaException(String.format(
+ "Backing store for table %s was not injected by SamzaContainer", tableSpec.getId()));
+ }
+
logger.info("Initialized backing store for table " + tableSpec.getId());
}
@@ -59,17 +82,9 @@ abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBac
if (kvStore == null) {
throw new SamzaException("Store not initialized for table " + tableSpec.getId());
}
- return new LocalStoreBackedReadWriteTable(kvStore);
- }
-
- @Override
- public void start() {
- logger.info("Starting table provider for table " + tableSpec.getId());
- }
-
- @Override
- public void stop() {
- logger.info("Stopping table provider for table " + tableSpec.getId());
+ ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
+ table.init(containerContext, taskContext);
+ return table;
}
protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) {
@@ -89,4 +104,9 @@ abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBac
return storeConfig;
}
+
+ @Override
+ public void close() {
+ logger.info("Shutting down table provider for table " + tableSpec.getId());
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
index 3149c86..4037f60 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -36,8 +36,8 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
* Constructs an instance of {@link LocalStoreBackedReadWriteTable}
* @param kvStore the backing store
*/
- public LocalStoreBackedReadWriteTable(KeyValueStore kvStore) {
- super(kvStore);
+ public LocalStoreBackedReadWriteTable(String tableId, KeyValueStore kvStore) {
+ super(tableId, kvStore);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
index fead086..5ff58ab 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
@@ -24,6 +24,8 @@ import java.util.stream.Collectors;
import org.apache.samza.table.ReadableTable;
+import com.google.common.base.Preconditions;
+
/**
* A store backed readable table
@@ -34,12 +36,16 @@ import org.apache.samza.table.ReadableTable;
public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> {
protected KeyValueStore<K, V> kvStore;
+ protected String tableId;
/**
* Constructs an instance of {@link LocalStoreBackedReadableTable}
* @param kvStore the backing store
*/
- public LocalStoreBackedReadableTable(KeyValueStore<K, V> kvStore) {
+ public LocalStoreBackedReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
+ Preconditions.checkArgument(tableId != null & !tableId.isEmpty() , "invalid tableId");
+ Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+ this.tableId = tableId;
this.kvStore = kvStore;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
index 9c95637..d30c18f 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
@@ -27,11 +27,13 @@ import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.storage.StorageEngine;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
import org.junit.Before;
import org.junit.Test;
import junit.framework.Assert;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -55,7 +57,9 @@ public class TestLocalBaseStoreBackedTableProvider {
@Test
public void testInit() {
StorageEngine store = mock(KeyValueStorageEngine.class);
- tableProvider.init(store);
+ TaskContext taskContext = mock(TaskContext.class);
+ when(taskContext.getStore(any())).thenReturn(store);
+ tableProvider.init(null, taskContext);
Assert.assertNotNull(tableProvider.getTable());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 8f7eb5d..23fa9e6 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -74,7 +74,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig();
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.Profile.samza.system", "test");
configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
@@ -112,7 +112,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig();
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
@@ -170,7 +170,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
Profile[] profiles = TestTableData.generateProfiles(count);
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig();
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
configs.put("streams.Profile1.samza.system", "test");
configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
@@ -239,7 +239,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
}
- private Map<String, String> getBaseJobConfig() {
+ static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
Map<String, String> configs = new HashMap<>();
configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
@@ -251,8 +251,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
// For intermediate streams
configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
- configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
- configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+ configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
+ configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
configs.put("systems.kafka.samza.key.serde", "int");
configs.put("systems.kafka.samza.msg.serde", "json");
configs.put("systems.kafka.default.stream.replication.factor", "1");
@@ -281,7 +281,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
}
}
- private class PageViewToProfileJoinFunction implements StreamTableJoinFunction
+ static class PageViewToProfileJoinFunction implements StreamTableJoinFunction
<Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
private int count;
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
new file mode 100644
index 0000000..a260c3f
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.remote.RemoteReadableTable;
+import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+
+public class TestRemoteTable extends AbstractIntegrationTestHarness {
+ private TableReadFunction<Integer, TestTableData.Profile> getInMemoryReader(TestTableData.Profile[] profiles) {
+ final Map<Integer, TestTableData.Profile> profileMap = Arrays.stream(profiles)
+ .collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
+ TableReadFunction<Integer, TestTableData.Profile> reader =
+ (TableReadFunction<Integer, TestTableData.Profile>) key -> profileMap.getOrDefault(key, null);
+ return reader;
+ }
+
+ static List<TestTableData.EnrichedPageView> writtenRecords = new LinkedList<>();
+
+ static class InMemoryWriteFunction implements TableWriteFunction<Integer, TestTableData.EnrichedPageView> {
+ private transient List<TestTableData.EnrichedPageView> records;
+
+ // Verify serializable functionality
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // Write to the global list for verification
+ records = writtenRecords;
+ }
+
+ @Override
+ public void put(Integer key, TestTableData.EnrichedPageView record) {
+ records.add(record);
+ }
+
+ @Override
+ public void delete(Integer key) {
+ records.remove(key);
+ }
+
+ @Override
+ public void deleteAll(Collection<Integer> keys) {
+ records.removeAll(keys);
+ }
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTable() throws Exception {
+ List<TestTableData.PageView> received = new LinkedList<>();
+ final InMemoryWriteFunction writer = new InMemoryWriteFunction();
+
+ int count = 10;
+ TestTableData.PageView[] pageViews = TestTableData.generatePageViews(count);
+ TestTableData.Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+ final RateLimiter readRateLimiter = mock(RateLimiter.class);
+ final RateLimiter writeRateLimiter = mock(RateLimiter.class);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
+ final StreamApplication app = (streamGraph, cfg) -> {
+ RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
+ inputTableDesc
+ .withReadFunction(getInMemoryReader(profiles))
+ .withRateLimiter(readRateLimiter, null, null);
+
+ RemoteTableDescriptor<Integer, TestTableData.EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+ outputTableDesc
+ .withReadFunction(key -> null) // dummy reader
+ .withWriteFunction(writer)
+ .withRateLimiter(writeRateLimiter, null, null);
+
+ Table<KV<Integer, TestTableData.Profile>> inputTable = streamGraph.getTable(inputTableDesc);
+ Table<KV<Integer, TestTableData.EnrichedPageView>> outputTable = streamGraph.getTable(outputTableDesc);
+
+ streamGraph.getInputStream("PageView", new NoOpSerde<TestTableData.PageView>())
+ .map(pv -> {
+ received.add(pv);
+ return new KV<Integer, TestTableData.PageView>(pv.getMemberId(), pv);
+ })
+ .join(inputTable, new TestLocalTable.PageViewToProfileJoinFunction())
+ .map(m -> new KV(m.getMemberId(), m))
+ .sendTo(outputTable);
+ };
+
+ runner.run(app);
+ runner.waitForFinish();
+
+ int numExpected = count * partitionCount;
+ Assert.assertEquals(numExpected, received.size());
+ Assert.assertEquals(numExpected, writtenRecords.size());
+ Assert.assertTrue(writtenRecords.get(0) instanceof TestTableData.EnrichedPageView);
+ }
+
+ private TaskContext createMockTaskContext() {
+ MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
+ doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
+ TaskContext context = mock(TaskContext.class);
+ doReturn(metricsRegistry).when(context).getMetricsRegistry();
+ return context;
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testCatchReaderException() {
+ TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
+ doThrow(new RuntimeException("Expected test exception")).when(reader).get(anyString());
+ RemoteReadableTable<String, ?> table = new RemoteReadableTable<>("table1", reader, null, null);
+ table.init(mock(SamzaContainerContext.class), createMockTaskContext());
+ table.get("abc");
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testCatchWriterException() {
+ TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+ TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
+ doThrow(new RuntimeException("Expected test exception")).when(writer).put(anyString(), any());
+ RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<>("table1", reader, writer, null, null, null);
+ table.init(mock(SamzaContainerContext.class), createMockTaskContext());
+ table.put("abc", "efg");
+ }
+}
[2/2] samza git commit: SAMZA-1610: Implementation of remote table
provider
Posted by ja...@apache.org.
SAMZA-1610: Implementation of remote table provider
Please see commit messages for detailed descriptions.
Author: Peng Du <pd...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>, Wei Song <ws...@linkedin.com>
Closes #432 from pdu-mn1/remote-table-0222
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2be7061d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2be7061d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2be7061d
Branch: refs/heads/master
Commit: 2be7061d4a6ef6345b6f0566cdfeb9bbf279686f
Parents: 1971d59
Author: Peng Du <pd...@linkedin.com>
Authored: Fri Mar 9 14:57:56 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Mar 9 14:57:56 2018 -0800
----------------------------------------------------------------------
.../table/LocalStoreBackedTableProvider.java | 37 ---
.../org/apache/samza/table/ReadableTable.java | 11 +-
.../org/apache/samza/table/TableProvider.java | 18 +-
.../java/org/apache/samza/util/RateLimiter.java | 20 +-
.../org/apache/samza/table/TableManager.java | 50 ++--
.../samza/table/remote/CreditFunction.java | 36 +++
.../table/remote/RemoteReadWriteTable.java | 178 ++++++++++++++
.../samza/table/remote/RemoteReadableTable.java | 181 ++++++++++++++
.../table/remote/RemoteTableDescriptor.java | 214 ++++++++++++++++
.../samza/table/remote/RemoteTableProvider.java | 154 ++++++++++++
.../remote/RemoteTableProviderFactory.java | 38 +++
.../samza/table/remote/TableReadFunction.java | 66 +++++
.../samza/table/remote/TableWriteFunction.java | 81 ++++++
.../apache/samza/util/EmbeddedRateLimiter.java | 98 --------
.../samza/util/EmbeddedTaggedRateLimiter.java | 48 ++--
.../apache/samza/container/SamzaContainer.scala | 3 +-
.../apache/samza/container/TaskInstance.scala | 4 +-
.../apache/samza/table/TestTableManager.java | 14 +-
.../table/remote/TestRemoteTableDescriptor.java | 244 +++++++++++++++++++
.../samza/util/TestEmbeddedRateLimiter.java | 155 ------------
.../util/TestEmbeddedTaggedRateLimiter.java | 128 +++++++---
.../kv/BaseLocalStoreBackedTableProvider.java | 54 ++--
.../kv/LocalStoreBackedReadWriteTable.java | 4 +-
.../kv/LocalStoreBackedReadableTable.java | 8 +-
.../TestLocalBaseStoreBackedTableProvider.java | 6 +-
.../apache/samza/test/table/TestLocalTable.java | 14 +-
.../samza/test/table/TestRemoteTable.java | 180 ++++++++++++++
27 files changed, 1605 insertions(+), 439 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
deleted file mode 100644
index 21630ab..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import org.apache.samza.storage.StorageEngine;
-
-
-/**
- * Interface for tables backed by Samza local stores. The backing stores are
- * injected during initialization of the table. Since the lifecycle
- * of the underlying stores are already managed by Samza container,
- * the table provider will not manage the lifecycle of the backing
- * stores.
- */
-public interface LocalStoreBackedTableProvider extends TableProvider {
- /**
- * Initializes the table provider with the backing store
- * @param store the backing store
- */
- void init(StorageEngine store);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
index 5ad6e0f..15b6115 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
@@ -22,7 +22,9 @@ import java.util.List;
import java.util.Map;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.operators.KV;
+import org.apache.samza.task.TaskContext;
/**
@@ -34,6 +36,14 @@ import org.apache.samza.operators.KV;
*/
@InterfaceStability.Unstable
public interface ReadableTable<K, V> extends Table<KV<K, V>> {
+ /**
+ * Initializes the table during container initialization.
+ * Guaranteed to be invoked as the first operation on the table.
+ * @param containerContext Samza container context
+ * @param taskContext nullable for global table
+ */
+ default void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ }
/**
* Gets the value associated with the specified {@code key}.
@@ -57,5 +67,4 @@ public interface ReadableTable<K, V> extends Table<KV<K, V>> {
* Close the table and release any resources acquired
*/
void close();
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 54c6f5d..bbbe38a 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -21,6 +21,8 @@ package org.apache.samza.table;
import java.util.Map;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.task.TaskContext;
/**
@@ -30,6 +32,13 @@ import org.apache.samza.annotation.InterfaceStability;
@InterfaceStability.Unstable
public interface TableProvider {
/**
+ * Initialize TableProvider with container and task context
+ * @param containerContext Samza container context
+ * @param taskContext nullable for global table
+ */
+ void init(SamzaContainerContext containerContext, TaskContext taskContext);
+
+ /**
* Get an instance of the table for read/write operations
* @return the underlying table
*/
@@ -46,12 +55,7 @@ public interface TableProvider {
Map<String, String> generateConfig(Map<String, String> config);
/**
- * Start the underlying table
- */
- void start();
-
- /**
- * Stop the underlying table
+ * Shutdown the underlying table
*/
- void stop();
+ void close();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
index 75818dd..ad40d35 100644
--- a/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
@@ -20,6 +20,7 @@ package org.apache.samza.util;
import java.io.Serializable;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.samza.annotation.InterfaceStability;
@@ -43,7 +44,6 @@ import org.apache.samza.task.TaskContext;
* <ul>
* <li>Block indefinitely until requested credits become available</li>
* <li>Block for a provided amount of time, then return available credits</li>
- * <li>Non-blocking, returns immediately available credits</li>
* </ul>
*
*/
@@ -80,15 +80,6 @@ public interface RateLimiter extends Serializable {
int acquire(int numberOfCredit, long timeout, TimeUnit unit);
/**
- * Attempt to acquire the provided number of credits, returns immediately number of
- * credits acquired.
- *
- * @param numberOfCredit requested number of credits
- * @return number of credits acquired
- */
- int tryAcquire(int numberOfCredit);
-
- /**
* Attempt to acquire the provided number of credits for a number of tags, blocks indefinitely
* until all requested credits become available
*
@@ -110,11 +101,8 @@ public interface RateLimiter extends Serializable {
Map<String, Integer> acquire(Map<String, Integer> tagToCreditMap, long timeout, TimeUnit unit);
/**
- * Attempt to acquire the provided number of credits for a number of tags, returns immediately number of
- * credits acquired.
- *
- * @param tagToCreditMap a map of requested number of credits keyed by tag
- * @return a map of number of credits acquired keyed by tag
+ * Get the entire set of tags for which we have configured credits for rate limiting.
+ * @return set of supported tags
*/
- Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditMap);
+ Set<String> getSupportedTags();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index c3555f3..bada304 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -24,13 +24,17 @@ import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
+
/**
* A {@link TableManager} manages tables within a Samza task. For each table, it maintains
* the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
@@ -61,15 +65,14 @@ public class TableManager {
// tableId -> TableCtx
private final Map<String, TableCtx> tables = new HashMap<>();
- private boolean localTablesInitialized;
+ private boolean initialized;
/**
* Construct a table manager instance
- * @param config the job configuration
+ * @param config job configuration
* @param serdes Serde instances for tables
*/
public TableManager(Config config, Map<String, Serde<Object>> serdes) {
-
new JavaTableConfig(config).getTableIds().forEach(tableId -> {
// Construct the table provider
@@ -91,23 +94,14 @@ public class TableManager {
}
/**
- * Initialize all local table
- * @param stores stores created locally
+ * Initialize table providers with container and task contexts
+ * @param containerContext context for the Samza container
+ * @param taskContext context for the current task, nullable for global tables
*/
- public void initLocalTables(Map<String, StorageEngine> stores) {
- tables.values().forEach(ctx -> {
- if (ctx.tableProvider instanceof LocalStoreBackedTableProvider) {
- StorageEngine store = stores.get(ctx.tableSpec.getId());
- if (store == null) {
- throw new SamzaException(String.format(
- "Backing store for table %s was not injected by SamzaContainer",
- ctx.tableSpec.getId()));
- }
- ((LocalStoreBackedTableProvider) ctx.tableProvider).init(store);
- }
- });
-
- localTablesInitialized = true;
+ public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ Preconditions.checkNotNull(containerContext, "null container context.");
+ tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
+ initialized = true;
}
/**
@@ -126,17 +120,10 @@ public class TableManager {
}
/**
- * Start the table manager, internally it starts all tables
- */
- public void start() {
- tables.values().forEach(ctx -> ctx.tableProvider.start());
- }
-
- /**
* Shutdown the table manager, internally it shuts down all tables
*/
- public void shutdown() {
- tables.values().forEach(ctx -> ctx.tableProvider.stop());
+ public void close() {
+ tables.values().forEach(ctx -> ctx.tableProvider.close());
}
/**
@@ -145,9 +132,10 @@ public class TableManager {
* @return table instance
*/
public Table getTable(String tableId) {
- if (!localTablesInitialized) {
- throw new IllegalStateException("Local tables in TableManager not initialized.");
+ if (!initialized) {
+ throw new IllegalStateException("TableManager has not been initialized.");
}
+ Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId);
return tables.get(tableId).tableProvider.getTable();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java
new file mode 100644
index 0000000..0d30098
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+import org.apache.samza.operators.KV;
+
+
+/**
+ * Function interface for providing rate limiting credits for each table record.
+ * This interface allows callers to pass in lambda expressions which are otherwise
+ * non-serializable as-is.
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public interface CreditFunction<K, V> extends Function<KV<K, V>, Integer>, Serializable {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
new file mode 100644
index 0000000..a47e349
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
+
+
+/**
+ * Remote store backed read writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
+ protected final TableWriteFunction<K, V> writeFn;
+ protected final CreditFunction<K, V> writeCreditFn;
+ protected final boolean rateLimitWrites;
+
+ protected Timer putNs;
+ protected Timer deleteNs;
+ protected Timer flushNs;
+ protected Timer putThrottleNs; // use single timer for all write operations
+ protected Counter numPuts;
+ protected Counter numDeletes;
+ protected Counter numFlushes;
+
+ public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
+ RateLimiter ratelimiter, CreditFunction<K, V> readCreditFn, CreditFunction<K, V> writeCreditFn) {
+ super(tableId, readFn, ratelimiter, readCreditFn);
+ Preconditions.checkNotNull(writeFn, "null write function");
+ this.writeFn = writeFn;
+ this.writeCreditFn = writeCreditFn;
+ this.rateLimitWrites = rateLimiter != null && rateLimiter.getSupportedTags().contains(RL_WRITE_TAG);
+ logger.info("Rate limiting is {} for remote write operations", rateLimitWrites ? "enabled" : "disabled");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ super.init(containerContext, taskContext);
+ putNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-ns");
+ putThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-throttle-ns");
+ deleteNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-delete-ns");
+ flushNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-flush-ns");
+ numPuts = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-puts");
+ numDeletes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-deletes");
+ numFlushes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-flushes");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void put(K key, V value) {
+ try {
+ numPuts.inc();
+ if (rateLimitWrites) {
+ throttle(key, value, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
+ }
+ long startNs = System.nanoTime();
+ writeFn.put(key, value);
+ putNs.update(System.nanoTime() - startNs);
+ } catch (Exception e) {
+ String errMsg = String.format("Failed to put a record, key=%s, value=%s", key, value);
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ try {
+ writeFn.putAll(entries);
+ } catch (Exception e) {
+ String errMsg = String.format("Failed to put records: %s", entries);
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void delete(K key) {
+ try {
+ numDeletes.inc();
+ if (rateLimitWrites) {
+ throttle(key, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
+ }
+ long startNs = System.nanoTime();
+ writeFn.delete(key);
+ deleteNs.update(System.nanoTime() - startNs);
+ } catch (Exception e) {
+ String errMsg = String.format("Failed to delete a record, key=%s", key);
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void deleteAll(List<K> keys) {
+ try {
+ writeFn.deleteAll(keys);
+ } catch (Exception e) {
+ String errMsg = String.format("Failed to delete records, keys=%s", keys);
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush() {
+ try {
+ numFlushes.inc();
+ if (rateLimitWrites) {
+ throttle(null, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
+ }
+ long startNs = System.nanoTime();
+ writeFn.flush();
+ flushNs.update(System.nanoTime() - startNs);
+ } catch (Exception e) {
+ String errMsg = "Failed to flush remote store";
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ super.close();
+ writeFn.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
new file mode 100644
index 0000000..ca8e96b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
+
+
+/**
+ * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
+ * <p>
+ * Many stream-processing applications require to look-up data from remote data sources eg: databases,
+ * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
+ * naturally modeled as a join between the incoming stream and a {@link RemoteReadableTable}.
+ * <p>
+ * Example use-cases include:
+ * <ul>
+ * <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
+ * <li> Scoring page views with impressions services. </li>
+ * <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
+ * </ul>
+ * <p>
+ * A {@link RemoteReadableTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
+ * which encapsulate the functionality of reading and writing data to the remote service. These provide a
+ * pluggable means to specify I/O operations on the table. While the base implementation merely delegates to
+ * these reader and writer functions, sub-classes of {@link RemoteReadableTable} may provide rich functionality like
+ * caching or throttling on top of them.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
+ protected final String tableId;
+ protected final Logger logger;
+ protected final TableReadFunction<K, V> readFn;
+ protected final String groupName;
+ protected final RateLimiter rateLimiter;
+ protected final CreditFunction<K, V> readCreditFn;
+ protected final boolean rateLimitReads;
+
+ protected Timer getNs;
+ protected Timer getThrottleNs;
+ protected Counter numGets;
+
+ /**
+ * Construct a RemoteReadableTable instance
+ * @param tableId table id
+ * @param readFn {@link TableReadFunction} for read operations
+ * @param rateLimiter optional {@link RateLimiter} for throttling reads
+ * @param readCreditFn function returning a credit to be charged for rate limiting per record
+ */
+ public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn, RateLimiter rateLimiter,
+ CreditFunction<K, V> readCreditFn) {
+ Preconditions.checkArgument(tableId != null && !tableId.isEmpty(), "invalid table id");
+ Preconditions.checkNotNull(readFn, "null read function");
+ this.tableId = tableId;
+ this.readFn = readFn;
+ this.rateLimiter = rateLimiter;
+ this.readCreditFn = readCreditFn;
+ this.groupName = getClass().getSimpleName();
+ this.logger = LoggerFactory.getLogger(groupName + tableId);
+ this.rateLimitReads = rateLimiter != null && rateLimiter.getSupportedTags().contains(RL_READ_TAG);
+ logger.info("Rate limiting is {} for remote read operations", rateLimitReads ? "enabled" : "disabled");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ getNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-ns");
+ getThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-throttle-ns");
+ numGets = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-gets");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public V get(K key) {
+ try {
+ numGets.inc();
+ if (rateLimitReads) {
+ throttle(key, null, RL_READ_TAG, readCreditFn, getThrottleNs);
+ }
+ long startNs = System.nanoTime();
+ V result = readFn.get(key);
+ getNs.update(System.nanoTime() - startNs);
+ return result;
+ } catch (Exception e) {
+ String errMsg = String.format("Failed to get a record, key=%s", key);
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<K, V> getAll(List<K> keys) {
+ Map<K, V> result;
+ try {
+ result = readFn.getAll(keys);
+ } catch (Exception e) {
+ String errMsg = "Failed to get some records";
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+
+ if (result == null) {
+ String errMsg = String.format("Received null records, keys=%s", keys);
+ logger.error(errMsg);
+ throw new SamzaException(errMsg);
+ }
+
+ if (result.size() < keys.size()) {
+ String errMsg = String.format("Received insufficient number of records (%d), keys=%s", result.size(), keys);
+ logger.error(errMsg);
+ throw new SamzaException(errMsg);
+ }
+
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ readFn.close();
+ }
+
+ /**
+ * Throttle requests given a table record (key, value) with rate limiter and credit function
+ * @param key key of the table record (nullable)
+ * @param value value of the table record (nullable)
+ * @param tag tag for rate limiter
+ * @param creditFn mapper function from KV to credits to be charged
+ * @param timer timer metric to track throttling delays
+ */
+ protected void throttle(K key, V value, String tag, CreditFunction<K, V> creditFn, Timer timer) {
+ long startNs = System.nanoTime();
+ int credits = (creditFn == null) ? 1 : creditFn.apply(KV.of(key, value));
+ rateLimiter.acquire(Collections.singletonMap(tag, credits));
+ timer.update(System.nanoTime() - startNs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
new file mode 100644
index 0000000..7bc369d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.EmbeddedTaggedRateLimiter;
+import org.apache.samza.util.RateLimiter;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Table descriptor for remote store backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
+ /**
+ * Tag to be used for provision credits for rate limiting read operations from the remote table.
+ * Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
+ * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+ */
+ public static final String RL_READ_TAG = "readTag";
+
+ /**
+ * Tag to be used for provision credits for rate limiting write operations into the remote table.
+ * Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
+ * through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+ * and it needs the write functionality.
+ */
+ public static final String RL_WRITE_TAG = "writeTag";
+
+ // Input support for a specific remote store (required)
+ private TableReadFunction<K, V> readFn;
+
+ // Output support for a specific remote store (optional)
+ private TableWriteFunction<K, V> writeFn;
+
+ // Rate limiter for client-side throttling;
+ // can either be constructed indirectly from rates or overridden by withRateLimiter()
+ private RateLimiter rateLimiter;
+
+ // Rates for constructing the default rate limiter when they are non-zero
+ private Map<String, Integer> tagCreditsMap = new HashMap<>();
+
+ private CreditFunction<K, V> readCreditFn;
+ private CreditFunction<K, V> writeCreditFn;
+
+ /**
+ * Construct a table descriptor instance
+ * @param tableId Id of the table
+ */
+ public RemoteTableDescriptor(String tableId) {
+ super(tableId);
+ }
+
+ @Override
+ public TableSpec getTableSpec() {
+ validate();
+
+ Map<String, String> tableSpecConfig = new HashMap<>();
+ generateTableSpecConfig(tableSpecConfig);
+
+ // Serialize and store reader/writer functions
+ tableSpecConfig.put(RemoteTableProvider.READ_FN, serializeObject("read function", readFn));
+
+ if (writeFn != null) {
+ tableSpecConfig.put(RemoteTableProvider.WRITE_FN, serializeObject("write function", writeFn));
+ }
+
+ // Serialize the rate limiter if specified
+ if (!tagCreditsMap.isEmpty()) {
+ rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
+ }
+
+ if (rateLimiter != null) {
+ tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, serializeObject("rate limiter", rateLimiter));
+ }
+
+ // Serialize the readCredit and writeCredit functions
+ if (readCreditFn != null) {
+ tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, serializeObject(
+ "read credit function", readCreditFn));
+ }
+
+ if (writeCreditFn != null) {
+ tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, serializeObject(
+ "write credit function", writeCreditFn));
+ }
+
+ return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
+ }
+
+ /**
+ * Use specified TableReadFunction with remote table.
+ * @param readFn read function instance
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
+ Preconditions.checkNotNull(readFn, "null read function");
+ this.readFn = readFn;
+ return this;
+ }
+
+ /**
+ * Use specified TableWriteFunction with remote table.
+ * @param writeFn write function instance
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ this.writeFn = writeFn;
+ return this;
+ }
+
+ /**
+ * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
+ * of credits to be charged from the rate limiter for table read and write operations.
+ * This is an advanced API that provides greater flexibility to throttle each record in the table
+ * with different number of credits. For most common use-cases eg: limit the number of read/write
+ * operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
+ * {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
+ *
+ * @param rateLimiter rate limiter instance to be used for throttling
+ * @param readCreditFn credit function for rate limiting read operations
+ * @param writeCreditFn credit function for rate limiting write operations
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter, CreditFunction<K, V> readCreditFn,
+ CreditFunction<K, V> writeCreditFn) {
+ Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
+ this.rateLimiter = rateLimiter;
+ this.readCreditFn = readCreditFn;
+ this.writeCreditFn = writeCreditFn;
+ return this;
+ }
+
+ /**
+ * Specify the rate limit for table read operations. If the read rate limit is set with this method
+ * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+ * and vice versa.
+ * @param creditsPerSec rate limit for read operations; must be positive
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
+ Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
+ tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
+ return this;
+ }
+
+ /**
+ * Specify the rate limit for table write operations. If the write rate limit is set with this method
+ * it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, CreditFunction, CreditFunction)}
+ * and vice versa.
+ * @param creditsPerSec rate limit for write operations; must be positive
+ * @return this table descriptor instance
+ */
+ public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
+ Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
+ tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
+ return this;
+ }
+
+ /**
+ * Helper method to serialize Java objects as Base64 strings
+ * @param name name of the object (for error reporting)
+ * @param object object to be serialized
+ * @return Base64 representation of the object
+ */
+ private <T> String serializeObject(String name, T object) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(object);
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ } catch (IOException e) {
+ throw new SamzaException("Failed to serialize " + name, e);
+ }
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+ Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
+ "Only one of rateLimiter instance or read/write limits can be specified");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
new file mode 100644
index 0000000..8b9001a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provide for remote table instances
+ */
+public class RemoteTableProvider implements TableProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteTableProvider.class);
+
+ static final String READ_FN = "io.readFn";
+ static final String WRITE_FN = "io.writeFn";
+ static final String RATE_LIMITER = "io.ratelimiter";
+ static final String READ_CREDIT_FN = "io.readCreditFn";
+ static final String WRITE_CREDIT_FN = "io.writeCreditFn";
+
+ private final TableSpec tableSpec;
+ private final boolean readOnly;
+ private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
+ private SamzaContainerContext containerContext;
+ private TaskContext taskContext;
+
+ public RemoteTableProvider(TableSpec tableSpec) {
+ this.tableSpec = tableSpec;
+ readOnly = !tableSpec.getConfig().containsKey(WRITE_FN);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ this.containerContext = containerContext;
+ this.taskContext = taskContext;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Table getTable() {
+ RemoteReadableTable table;
+ TableReadFunction<?, ?> readFn = getReadFn();
+ RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
+ if (rateLimiter != null) {
+ rateLimiter.init(containerContext.config, taskContext);
+ }
+ CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
+ if (readOnly) {
+ table = new RemoteReadableTable(tableSpec.getId(), readFn, rateLimiter, readCreditFn);
+ } else {
+ CreditFunction<?, ?> writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
+ table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), rateLimiter, readCreditFn, writeCreditFn);
+ }
+ table.init(containerContext, taskContext);
+ tables.add(table);
+ return table;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, String> generateConfig(Map<String, String> config) {
+ Map<String, String> tableConfig = new HashMap<>();
+
+ // Insert table_id prefix to config entires
+ tableSpec.getConfig().forEach((k, v) -> {
+ String realKey = String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+ tableConfig.put(realKey, v);
+ });
+
+ LOG.info("Generated configuration for table " + tableSpec.getId());
+
+ return tableConfig;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ tables.forEach(t -> t.close());
+ }
+
+ private <T> T deserializeObject(String key) {
+ String entry = tableSpec.getConfig().getOrDefault(key, "");
+ if (entry.isEmpty()) {
+ return null;
+ }
+
+ try {
+ byte [] bytes = Base64.getDecoder().decode(entry);
+ return (T) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
+ } catch (Exception e) {
+ String errMsg = "Failed to deserialize " + key;
+ throw new SamzaException(errMsg, e);
+ }
+ }
+
+ private TableReadFunction<?, ?> getReadFn() {
+ TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
+ if (readFn != null) {
+ readFn.init(containerContext.config, taskContext);
+ }
+ return readFn;
+ }
+
+ private TableWriteFunction<?, ?> getWriteFn() {
+ TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
+ if (writeFn != null) {
+ writeFn.init(containerContext.config, taskContext);
+ }
+ return writeFn;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
new file mode 100644
index 0000000..0eb88fd
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProviderFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory class for a remote table provider
+ */
+public class RemoteTableProviderFactory implements TableProviderFactory {
+ @Override
+ public TableProvider getTableProvider(TableSpec tableSpec) {
+ Preconditions.checkNotNull(tableSpec, "null table spec");
+ return new RemoteTableProvider(tableSpec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
new file mode 100644
index 0000000..dbd386c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+
+
+/**
+ * A function object to be used with a {@link RemoteReadableTable} implementation. It encapsulates the functionality
+ * of reading table record(s) for a provided set of key(s).
+ *
+ * <p> Instances of {@link TableReadFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+ /**
+ * Fetch single table record for a specified {@code key}. This method must be thread-safe.
+ * @param key key for the table record
+ * @return table record for the specified {@code key}
+ */
+ V get(K key);
+
+ /**
+ * Fetch the table {@code records} for specified {@code keys}. This method must be thread-safe.
+ * @param keys keys for the table records
+ * @return all records for the specified keys if succeeded; depending on the implementation
+ * of {@link TableReadFunction#get(Object)} it either returns records for a subset of the
+ * keys or throws exception when there is any failure.
+ */
+ default Map<K, V> getAll(Collection<K> keys) {
+ Map<K, V> records = new HashMap<>();
+ keys.forEach(k -> records.put(k, get(k)));
+ return records;
+ }
+
+ // optionally implement readObject() to initialize transient states
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
new file mode 100644
index 0000000..3fb8fda
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ClosableFunction;
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.storage.kv.Entry;
+
+
+/**
+ * A function object to be used with a {@link RemoteReadWriteTable} implementation. It encapsulates the functionality
+ * of writing table record(s) for a provided set of key(s) to the store.
+ *
+ * <p> Instances of {@link TableWriteFunction} are meant to be serializable. ie. any non-serializable state
+ * (eg: network sockets) should be marked as transient and recreated inside readObject().
+ *
+ * <p> Implementations are expected to be thread-safe.
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+ /**
+ * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
+ * @param key key for the table record
+ * @param record table record to be written
+ */
+ void put(K key, V record);
+
+ /**
+ * Store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+ * @param records table records to be written
+ */
+ default void putAll(List<Entry<K, V>> records) {
+ records.forEach(e -> put(e.getKey(), e.getValue()));
+ }
+
+ /**
+ * Delete the {@code record} with specified {@code key} from the remote store
+ * @param key key to the table record to be deleted
+ */
+ void delete(K key);
+
+ /**
+ * Delete all {@code records} with the specified {@code keys} from the remote store
+ * @param keys keys for the table records to be written
+ */
+ default void deleteAll(Collection<K> keys) {
+ keys.stream().forEach(k -> delete(k));
+ }
+
+ /**
+ * Flush the remote store (optional)
+ */
+ default void flush() {
+ }
+
+ // optionally implement readObject() to initialize transient states
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java
deleted file mode 100644
index 9ccf2f4..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedRateLimiter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * An embedded rate limiter
- */
-public class EmbeddedRateLimiter implements RateLimiter {
-
- static final private Logger LOGGER = LoggerFactory.getLogger(EmbeddedRateLimiter.class);
-
- private final int targetRate;
- private com.google.common.util.concurrent.RateLimiter rateLimiter;
-
- public EmbeddedRateLimiter(int creditsPerSecond) {
- this.targetRate = creditsPerSecond;
- }
-
- @Override
- public void acquire(int numberOfCredits) {
- ensureInitialized();
- rateLimiter.acquire(numberOfCredits);
- }
-
- @Override
- public int acquire(int numberOfCredits, long timeout, TimeUnit unit) {
- ensureInitialized();
- return rateLimiter.tryAcquire(numberOfCredits, timeout, unit)
- ? numberOfCredits
- : 0;
- }
-
- @Override
- public int tryAcquire(int numberOfCredits) {
- ensureInitialized();
- return rateLimiter.tryAcquire(numberOfCredits)
- ? numberOfCredits
- : 0;
- }
-
- @Override
- public void acquire(Map<String, Integer> tagToCreditsMap) {
- throw new IllegalArgumentException("This method is not applicable");
- }
-
- @Override
- public Map<String, Integer> acquire(Map<String, Integer> tagToCreditsMap, long timeout, TimeUnit unit) {
- throw new IllegalArgumentException("This method is not applicable");
- }
-
- @Override
- public Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditsMap) {
- throw new IllegalArgumentException("This method is not applicable");
- }
-
- @Override
- public void init(Config config, TaskContext taskContext) {
- int effectiveRate = targetRate;
- if (taskContext != null) {
- effectiveRate /= taskContext.getSamzaContainerContext().taskNames.size();
- LOGGER.info(String.format("Effective rate limit for task %s is %d",
- taskContext.getTaskName(), effectiveRate));
- }
- this.rateLimiter = com.google.common.util.concurrent.RateLimiter.create(effectiveRate);
- }
-
- private void ensureInitialized() {
- Preconditions.checkState(rateLimiter != null, "Not initialized");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
index 9c20eee..1cf9a9c 100644
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
@@ -20,6 +20,7 @@ package org.apache.samza.util;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -36,17 +37,25 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
- * An embedded rate limiter that supports tags
+ * An embedded rate limiter that supports tags. A default tag will be used if users specifies a simple rate only
+ * for simple use cases.
*/
public class EmbeddedTaggedRateLimiter implements RateLimiter {
-
static final private Logger LOGGER = LoggerFactory.getLogger(EmbeddedTaggedRateLimiter.class);
+ private static final String DEFAULT_TAG = "default-tag";
+ private static final Map<String, Integer> DEFAULT_TAG_MAP = Collections.singletonMap(DEFAULT_TAG, 0);
private final Map<String, Integer> tagToTargetRateMap;
private Map<String, com.google.common.util.concurrent.RateLimiter> tagToRateLimiterMap;
+ private boolean initialized;
+
+ public EmbeddedTaggedRateLimiter(int creditsPerSecond) {
+ this(Collections.singletonMap(DEFAULT_TAG, creditsPerSecond));
+ }
public EmbeddedTaggedRateLimiter(Map<String, Integer> tagToCreditsPerSecondMap) {
Preconditions.checkArgument(tagToCreditsPerSecondMap.size() > 0, "Map of tags can't be empty");
+ tagToCreditsPerSecondMap.values().forEach(c -> Preconditions.checkArgument(c >= 0, "Credits must be non-negative"));
this.tagToTargetRateMap = tagToCreditsPerSecondMap;
}
@@ -72,39 +81,28 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
int availableCredits = rateLimiter.tryAcquire(requiredCredits, remainingTimeoutInNanos, NANOSECONDS)
? requiredCredits
: 0;
- return new ImmutablePair<String, Integer>(tag, availableCredits);
+ return new ImmutablePair<>(tag, availableCredits);
})
.collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue));
}
@Override
- public Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditsMap) {
- ensureTagsAreValid(tagToCreditsMap);
- return tagToCreditsMap.entrySet().stream()
- .map(e -> {
- String tag = e.getKey();
- int requiredCredits = e.getValue();
- int availableCredits = tagToRateLimiterMap.get(tag).tryAcquire(requiredCredits)
- ? requiredCredits
- : 0;
- return new ImmutablePair<String, Integer>(tag, availableCredits);
- })
- .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue));
+ public Set<String> getSupportedTags() {
+ return Collections.unmodifiableSet(tagToRateLimiterMap.keySet());
}
@Override
public void acquire(int numberOfCredits) {
- throw new IllegalArgumentException("This method is not applicable");
+ ensureTagsAreValid(DEFAULT_TAG_MAP);
+ tagToRateLimiterMap.get(DEFAULT_TAG).acquire(numberOfCredits);
}
@Override
public int acquire(int numberOfCredit, long timeout, TimeUnit unit) {
- throw new IllegalArgumentException("This method is not applicable");
- }
-
- @Override
- public int tryAcquire(int numberOfCredit) {
- throw new IllegalArgumentException("This method is not applicable");
+ ensureTagsAreValid(DEFAULT_TAG_MAP);
+ return tagToRateLimiterMap.get(DEFAULT_TAG).tryAcquire(numberOfCredit, timeout, unit)
+ ? numberOfCredit
+ : 0;
}
@Override
@@ -118,15 +116,15 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d",
taskContext.getTaskName(), tag, effectiveRate));
}
- return new ImmutablePair<String, com.google.common.util.concurrent.RateLimiter>(
- tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate));
+ return new ImmutablePair<>(tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate));
})
.collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue))
);
+ initialized = true;
}
private void ensureInitialized() {
- Preconditions.checkState(tagToRateLimiterMap != null, "Not initialized");
+ Preconditions.checkState(initialized, "Not initialized");
}
private void ensureTagsAreValid(Map<String, ?> tagMap) {
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 789d75b..7cc1924 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -538,9 +538,8 @@ object SamzaContainer extends Logging {
new SystemClock)
val tableManager = new TableManager(config, serdes.asJava)
- tableManager.initLocalTables(taskStores.asJava)
- info("Got table manager");
+ info("Got table manager")
val systemStreamPartitions = taskModel
.getSystemStreamPartitions
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index cb73c5d..3ac37c6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -100,7 +100,7 @@ class TaskInstance(
if (tableManager != null) {
debug("Starting table manager for taskName: %s" format taskName)
- tableManager.start
+ tableManager.init(containerContext, context)
} else {
debug("Skipping table manager initialization for taskName: %s" format taskName)
}
@@ -244,7 +244,7 @@ class TaskInstance(
if (tableManager != null) {
debug("Shutting down table manager for taskName: %s" format taskName)
- tableManager.shutdown
+ tableManager.close
} else {
debug("Skipping table manager shutdown for taskName: %s" format taskName)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index df5b9e5..24178d0 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -27,11 +27,13 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.task.TaskContext;
import org.junit.Test;
import junit.framework.Assert;
@@ -49,13 +51,13 @@ public class TestTableManager {
public static class DummyTableProviderFactory implements TableProviderFactory {
- static Table table;
- static LocalStoreBackedTableProvider tableProvider;
+ static ReadableTable table;
+ static TableProvider tableProvider;
@Override
public TableProvider getTableProvider(TableSpec tableSpec) {
- table = mock(Table.class);
- tableProvider = mock(LocalStoreBackedTableProvider.class);
+ table = mock(ReadableTable.class);
+ tableProvider = mock(TableProvider.class);
when(tableProvider.getTable()).thenReturn(table);
return tableProvider;
}
@@ -120,10 +122,10 @@ public class TestTableManager {
});
TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
- tableManager.initLocalTables(storageEngines);
+ tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class));
Table table = tableManager.getTable(TABLE_ID);
- verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject());
+ verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
Assert.assertEquals(DummyTableProviderFactory.table, table);
Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables");
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
new file mode 100644
index 0000000..acf3d61
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.EmbeddedTaggedRateLimiter;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
+import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteTableDescriptor {
+ private void doTestSerialize(RateLimiter rateLimiter,
+ CreditFunction readCredFn,
+ CreditFunction writeCredFn) {
+ RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+ desc.withReadFunction(mock(TableReadFunction.class));
+ desc.withWriteFunction(mock(TableWriteFunction.class));
+ if (rateLimiter != null) {
+ desc.withRateLimiter(rateLimiter, readCredFn, writeCredFn);
+ } else {
+ desc.withReadRateLimit(100);
+ desc.withWriteRateLimit(200);
+ }
+ TableSpec spec = desc.getTableSpec();
+ Assert.assertTrue(spec.getConfig().containsKey(RemoteTableProvider.RATE_LIMITER));
+ Assert.assertEquals(readCredFn != null, spec.getConfig().containsKey(RemoteTableProvider.READ_CREDIT_FN));
+ Assert.assertEquals(writeCredFn != null, spec.getConfig().containsKey(RemoteTableProvider.WRITE_CREDIT_FN));
+ }
+
+ @Test
+ public void testSerializeSimple() {
+ doTestSerialize(null, null, null);
+ }
+
+ @Test
+ public void testSerializeWithLimiter() {
+ doTestSerialize(mock(RateLimiter.class), null, null);
+ }
+
+ @Test
+ public void testSerializeWithLimiterAndReadCredFn() {
+ doTestSerialize(mock(RateLimiter.class), kv -> 1, null);
+ }
+
+ @Test
+ public void testSerializeWithLimiterAndWriteCredFn() {
+ doTestSerialize(mock(RateLimiter.class), null, kv -> 1);
+ }
+
+ @Test
+ public void testSerializeWithLimiterAndReadWriteCredFns() {
+ doTestSerialize(mock(RateLimiter.class), kv -> 1, kv -> 1);
+ }
+
+ @Test
+ public void testSerializeNullWriteFunction() {
+ RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+ desc.withReadFunction(mock(TableReadFunction.class));
+ TableSpec spec = desc.getTableSpec();
+ Assert.assertTrue(spec.getConfig().containsKey(RemoteTableProvider.READ_FN));
+ Assert.assertFalse(spec.getConfig().containsKey(RemoteTableProvider.WRITE_FN));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSerializeNullReadFunction() {
+ RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+ TableSpec spec = desc.getTableSpec();
+ Assert.assertTrue(spec.getConfig().containsKey(RemoteTableProvider.READ_FN));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSpecifyBothRateAndRateLimiter() {
+ RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
+ desc.withReadFunction(mock(TableReadFunction.class));
+ desc.withReadRateLimit(100);
+ desc.withRateLimiter(mock(RateLimiter.class), null, null);
+ desc.getTableSpec();
+ }
+
+ private TaskContext createMockTaskContext() {
+ MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
+ doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
+ TaskContext taskContext = mock(TaskContext.class);
+ doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+ SamzaContainerContext containerCtx = new SamzaContainerContext(
+ "1", null, Collections.singleton(new TaskName("MyTask")), null);
+ doReturn(containerCtx).when(taskContext).getSamzaContainerContext();
+ return taskContext;
+ }
+
+ static class CountingCreditFunction<K, V> implements CreditFunction<K, V> {
+ int numCalls = 0;
+ @Override
+ public Integer apply(KV<K, V> kv) {
+ numCalls++;
+ return 1;
+ }
+ }
+
+ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) {
+ int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0);
+ RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1");
+ desc.withReadFunction(mock(TableReadFunction.class));
+ desc.withWriteFunction(mock(TableWriteFunction.class));
+ if (rateOnly) {
+ if (rlGets) {
+ desc.withReadRateLimit(1000);
+ }
+ if (rlPuts) {
+ desc.withWriteRateLimit(2000);
+ }
+ } else {
+ if (numRateLimitOps > 0) {
+ Map<String, Integer> tagCredits = new HashMap<>();
+ if (rlGets) {
+ tagCredits.put(RL_READ_TAG, 1000);
+ }
+ if (rlPuts) {
+ tagCredits.put(RL_WRITE_TAG, 2000);
+ }
+
+ // Spy the rate limiter to verify call count
+ RateLimiter rateLimiter = spy(new EmbeddedTaggedRateLimiter(tagCredits));
+ desc.withRateLimiter(rateLimiter, new CountingCreditFunction(), new CountingCreditFunction());
+ }
+ }
+
+ TableSpec spec = desc.getTableSpec();
+ RemoteTableProvider provider = new RemoteTableProvider(spec);
+ provider.init(mock(SamzaContainerContext.class), createMockTaskContext());
+ Table table = provider.getTable();
+ Assert.assertTrue(table instanceof RemoteReadWriteTable);
+ RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+ Assert.assertNotNull(rwTable.readFn);
+ Assert.assertNotNull(rwTable.writeFn);
+ if (numRateLimitOps > 0) {
+ Assert.assertNotNull(rwTable.rateLimiter);
+ }
+
+ // Verify rate limiter usage
+ if (numRateLimitOps > 0) {
+ rwTable.get("xxx");
+ rwTable.put("yyy", "zzz");
+
+ if (!rateOnly) {
+ verify(rwTable.rateLimiter, times(numRateLimitOps)).acquire(anyMap());
+
+ CountingCreditFunction<?, ?> readCreditFn = (CountingCreditFunction<?, ?>) rwTable.readCreditFn;
+ CountingCreditFunction<?, ?> writeCreditFn = (CountingCreditFunction<?, ?>) rwTable.writeCreditFn;
+
+ Assert.assertNotNull(readCreditFn);
+ Assert.assertNotNull(writeCreditFn);
+
+ Assert.assertEquals(readCreditFn.numCalls, rlGets ? 1 : 0);
+ Assert.assertEquals(writeCreditFn.numCalls, rlPuts ? 1 : 0);
+ } else {
+ Assert.assertTrue(rwTable.rateLimiter instanceof EmbeddedTaggedRateLimiter);
+ Assert.assertEquals(rwTable.rateLimiter.getSupportedTags().size(), numRateLimitOps);
+ if (rlGets) {
+ Assert.assertTrue(rwTable.rateLimiter.getSupportedTags().contains(RL_READ_TAG));
+ }
+ if (rlPuts) {
+ Assert.assertTrue(rwTable.rateLimiter.getSupportedTags().contains(RL_WRITE_TAG));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testDeserializeReadFunctionNoRateLimit() {
+ doTestDeserializeReadFunctionAndLimiter(false, false, false);
+ }
+
+ @Test
+ public void testDeserializeReadFunctionAndLimiterWrite() {
+ doTestDeserializeReadFunctionAndLimiter(false, false, true);
+ }
+
+ @Test
+ public void testDeserializeReadFunctionAndLimiterRead() {
+ doTestDeserializeReadFunctionAndLimiter(false, true, false);
+ }
+
+ @Test
+ public void testDeserializeReadFunctionAndLimiterReadWrite() {
+ doTestDeserializeReadFunctionAndLimiter(false, true, true);
+ }
+
+ @Test
+ public void testDeserializeReadFunctionAndLimiterRateOnlyWrite() {
+ doTestDeserializeReadFunctionAndLimiter(true, false, true);
+ }
+
+ @Test
+ public void testDeserializeReadFunctionAndLimiterRateOnlyRead() {
+ doTestDeserializeReadFunctionAndLimiter(true, true, false);
+ }
+
+ @Test
+ public void testDeserializeReadFunctionAndLimiterRateOnlyReadWrite() {
+ doTestDeserializeReadFunctionAndLimiter(true, true, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2be7061d/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java
deleted file mode 100644
index 1b3f687..0000000
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedRateLimiter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestEmbeddedRateLimiter {
-
- final static private int TEST_INTERVAL = 200; // ms
- final static private int TARGET_RATE = 4000;
- final static private int NUMBER_OF_TASKS = 2;
- final static private int TARGET_RATE_PER_TASK = TARGET_RATE / NUMBER_OF_TASKS;
- final static private int INCREMENT = 2;
-
- @Test
- public void testAcquire() {
- RateLimiter rateLimiter = new EmbeddedRateLimiter(TARGET_RATE);
- initRateLimiter(rateLimiter);
-
- int count = 0;
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < TEST_INTERVAL) {
- rateLimiter.acquire(INCREMENT);
- count += INCREMENT;
- }
-
- long rate = count * 1000 / TEST_INTERVAL;
- verifyRate(rate);
- }
-
- @Test
- public void testTryAcquire() {
- RateLimiter rateLimiter = new EmbeddedRateLimiter(TARGET_RATE);
- initRateLimiter(rateLimiter);
-
- boolean hasSeenZeros = false;
-
- int count = 0;
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < TEST_INTERVAL) {
- int availableCredits = rateLimiter.tryAcquire(INCREMENT);
- if (availableCredits <= 0) {
- hasSeenZeros = true;
- } else {
- count += INCREMENT;
- }
- }
-
- long rate = count * 1000 / TEST_INTERVAL;
- verifyRate(rate);
- Assert.assertTrue(hasSeenZeros);
- }
-
- @Test
- public void testAcquireWithTimeout() {
- RateLimiter rateLimiter = new EmbeddedRateLimiter(TARGET_RATE);
- initRateLimiter(rateLimiter);
-
- boolean hasSeenZeros = false;
-
- int count = 0;
- int callCount = 0;
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < TEST_INTERVAL) {
- ++callCount;
- int availableCredits = rateLimiter.acquire(INCREMENT, 20, MILLISECONDS);
- if (availableCredits <= 0) {
- hasSeenZeros = true;
- } else {
- count += INCREMENT;
- }
- }
-
- long rate = count * 1000 / TEST_INTERVAL;
- verifyRate(rate);
- Assert.assertTrue(Math.abs(callCount - TARGET_RATE_PER_TASK * TEST_INTERVAL / 1000 / INCREMENT) <= 2);
- Assert.assertFalse(hasSeenZeros);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testFailsWhenUninitialized() {
- new EmbeddedRateLimiter(100).acquire(1);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testFailsWhenUsingTags() {
- RateLimiter rateLimiter = new EmbeddedRateLimiter(10);
- initRateLimiter(rateLimiter);
- Map<String, Integer> tagToCredits = new HashMap<>();
- tagToCredits.put("red", 1);
- tagToCredits.put("green", 1);
- rateLimiter.acquire(tagToCredits);
- }
-
- private void verifyRate(long rate) {
- // As the actual rate would likely not be exactly the same as target rate, the calculation below
- // verifies the actual rate is within 5% of the target rate per task
- Assert.assertTrue(Math.abs(rate - TARGET_RATE_PER_TASK) <= TARGET_RATE_PER_TASK * 5 / 100);
- }
-
- static void initRateLimiter(RateLimiter rateLimiter) {
- Config config = mock(Config.class);
- TaskContext taskContext = mock(TaskContext.class);
- SamzaContainerContext containerContext = mockSamzaContainerContext();
- when(taskContext.getSamzaContainerContext()).thenReturn(containerContext);
- rateLimiter.init(config, taskContext);
- }
-
- static SamzaContainerContext mockSamzaContainerContext() {
- try {
- Collection<String> taskNames = mock(Collection.class);
- when(taskNames.size()).thenReturn(NUMBER_OF_TASKS);
- SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
- Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames");
- taskNamesField.setAccessible(true);
- taskNamesField.set(containerContext, taskNames);
- taskNamesField.setAccessible(false);
- return containerContext;
- } catch (Exception ex) {
- throw new SamzaException(ex);
- }
- }
-}