You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/12/17 23:11:35 UTC
[2/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and
ReadWriteTable
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
deleted file mode 100644
index 044fab4..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalReadWriteTable {
-
- public static final String TABLE_ID = "t1";
-
- private Timer putNs;
- private Timer putAllNs;
- private Timer deleteNs;
- private Timer deleteAllNs;
- private Timer flushNs;
- private Counter numPuts;
- private Counter numPutAlls;
- private Counter numDeletes;
- private Counter numDeleteAlls;
- private Counter numFlushes;
- private Timer putCallbackNs;
- private Timer deleteCallbackNs;
-
- private MetricsRegistry metricsRegistry;
-
- private KeyValueStore kvStore;
-
- @Before
- public void setUp() {
-
- putNs = new Timer("");
- putAllNs = new Timer("");
- deleteNs = new Timer("");
- deleteAllNs = new Timer("");
- flushNs = new Timer("");
- numPuts = new Counter("");
- numPutAlls = new Counter("");
- numDeletes = new Counter("");
- numDeleteAlls = new Counter("");
- numFlushes = new Counter("");
- putCallbackNs = new Timer("");
- deleteCallbackNs = new Timer("");
-
- metricsRegistry = mock(MetricsRegistry.class);
- String groupName = LocalReadWriteTable.class.getSimpleName();
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
-
- kvStore = mock(KeyValueStore.class);
- }
-
- @Test
- public void testPut() throws Exception {
- ReadWriteTable table = createTable(false);
- table.put("k1", "v1");
- table.putAsync("k2", "v2").get();
- table.putAsync("k3", null).get();
- verify(kvStore, times(2)).put(any(), any());
- verify(kvStore, times(1)).delete(any());
- Assert.assertEquals(2, numPuts.getCount());
- Assert.assertEquals(1, numDeletes.getCount());
- Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
- Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeleteAlls.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testPutAll() throws Exception {
- ReadWriteTable table = createTable(false);
- List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
- table.putAll(entries);
- table.putAllAsync(entries).get();
- verify(kvStore, times(2)).putAll(any());
- verify(kvStore, times(2)).deleteAll(any());
- Assert.assertEquals(2, numPutAlls.getCount());
- Assert.assertEquals(2, numDeleteAlls.getCount());
- Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
- Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numDeletes.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testDelete() throws Exception {
- ReadWriteTable table = createTable(false);
- table.delete("");
- table.deleteAsync("").get();
- verify(kvStore, times(2)).delete(any());
- Assert.assertEquals(2, numDeletes.getCount());
- Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeleteAlls.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testDeleteAll() throws Exception {
- ReadWriteTable table = createTable(false);
- table.deleteAll(Collections.emptyList());
- table.deleteAllAsync(Collections.emptyList()).get();
- verify(kvStore, times(2)).deleteAll(any());
- Assert.assertEquals(2, numDeleteAlls.getCount());
- Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeletes.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testFlush() {
- ReadWriteTable table = createTable(false);
- table.flush();
- table.flush();
- verify(kvStore, times(2)).flush();
- Assert.assertEquals(2, numFlushes.getCount());
- Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeletes.getCount());
- Assert.assertEquals(0, numDeleteAlls.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testTimerDisabled() throws Exception {
- ReadWriteTable table = createTable(true);
- table.put("", "");
- table.putAsync("", "").get();
- table.putAll(Collections.emptyList());
- table.putAllAsync(Collections.emptyList()).get();
- table.delete("");
- table.deleteAsync("").get();
- table.deleteAll(Collections.emptyList());
- table.deleteAllAsync(Collections.emptyList()).get();
- table.flush();
- Assert.assertEquals(1, numFlushes.getCount());
- Assert.assertEquals(2, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(2, numDeletes.getCount());
- Assert.assertEquals(2, numDeleteAlls.getCount());
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- private LocalReadWriteTable createTable(boolean isTimerDisabled) {
- Map<String, String> config = new HashMap<>();
- if (isTimerDisabled) {
- config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
- }
- Context context = mock(Context.class);
- JobContext jobContext = mock(JobContext.class);
- when(context.getJobContext()).thenReturn(jobContext);
- when(jobContext.getConfig()).thenReturn(new MapConfig(config));
- ContainerContext containerContext = mock(ContainerContext.class);
- when(context.getContainerContext()).thenReturn(containerContext);
- when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
- LocalReadWriteTable table = new LocalReadWriteTable("t1", kvStore);
- table.init(context);
-
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
deleted file mode 100644
index e1c82d9..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadableTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-public class TestLocalReadableTable {
-
- public static final String TABLE_ID = "t1";
-
- private List<String> keys;
- private Map<String, String> values;
-
- private Timer getNs;
- private Timer getAllNs;
- private Counter numGets;
- private Counter numGetAlls;
- private Timer getCallbackNs;
- private Counter numMissedLookups;
-
- private MetricsRegistry metricsRegistry;
-
- private KeyValueStore kvStore;
-
- @Before
- public void setUp() {
- keys = Arrays.asList("k1", "k2", "k3");
-
- values = new HashMap<>();
- values.put("k1", "v1");
- values.put("k2", "v2");
- values.put("k3", null);
-
- kvStore = mock(KeyValueStore.class);
- when(kvStore.get("k1")).thenReturn("v1");
- when(kvStore.get("k2")).thenReturn("v2");
- when(kvStore.getAll(keys)).thenReturn(values);
-
- getNs = new Timer("");
- getAllNs = new Timer("");
- numGets = new Counter("");
- numGetAlls = new Counter("");
- getCallbackNs = new Timer("");
- numMissedLookups = new Counter("");
-
- metricsRegistry = mock(MetricsRegistry.class);
- String groupName = LocalReadableTable.class.getSimpleName();
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
- }
-
- @Test
- public void testGet() throws Exception {
- ReadableTable table = createTable(false);
- Assert.assertEquals("v1", table.get("k1"));
- Assert.assertEquals("v2", table.getAsync("k2").get());
- Assert.assertNull(table.get("k3"));
- verify(kvStore, times(3)).get(any());
- Assert.assertEquals(3, numGets.getCount());
- Assert.assertEquals(1, numMissedLookups.getCount());
- Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, numGetAlls.getCount());
- Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testGetAll() throws Exception {
- ReadableTable table = createTable(false);
- Assert.assertEquals(values, table.getAll(keys));
- Assert.assertEquals(values, table.getAllAsync(keys).get());
- verify(kvStore, times(2)).getAll(any());
- Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
- Assert.assertEquals(2, numMissedLookups.getCount());
- Assert.assertEquals(3, numGetAlls.getCount());
- Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, numGets.getCount());
- Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testTimerDisabled() throws Exception {
- ReadableTable table = createTable(true);
- table.get("");
- table.getAsync("").get();
- table.getAll(keys);
- table.getAllAsync(keys).get();
- Assert.assertEquals(2, numGets.getCount());
- Assert.assertEquals(4, numMissedLookups.getCount());
- Assert.assertEquals(2, numGetAlls.getCount());
- Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- private LocalReadableTable createTable(boolean isTimerDisabled) {
- Map<String, String> config = new HashMap<>();
- if (isTimerDisabled) {
- config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
- }
- Context context = mock(Context.class);
- JobContext jobContext = mock(JobContext.class);
- when(context.getJobContext()).thenReturn(jobContext);
- when(jobContext.getConfig()).thenReturn(new MapConfig(config));
- ContainerContext containerContext = mock(ContainerContext.class);
- when(context.getContainerContext()).thenReturn(containerContext);
- when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
- LocalReadableTable table = new LocalReadableTable("t1", kvStore);
- table.init(context);
-
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
index 5367931..263ab56 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -19,7 +19,6 @@
package org.apache.samza.storage.kv;
-import junit.framework.Assert;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
@@ -28,6 +27,7 @@ import org.apache.samza.context.TaskContext;
import org.apache.samza.table.TableProvider;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Test;
+import org.junit.Assert;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
new file mode 100644
index 0000000..0fd4539
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalTableRead {
+
+ public static final String TABLE_ID = "t1";
+
+ private List<String> keys;
+ private Map<String, String> values;
+
+ private Timer getNs;
+ private Timer getAllNs;
+ private Counter numGets;
+ private Counter numGetAlls;
+ private Timer getCallbackNs;
+ private Counter numMissedLookups;
+
+ private MetricsRegistry metricsRegistry;
+
+ private KeyValueStore kvStore;
+
+ @Before
+ public void setUp() {
+ keys = Arrays.asList("k1", "k2", "k3");
+
+ values = new HashMap<>();
+ values.put("k1", "v1");
+ values.put("k2", "v2");
+ values.put("k3", null);
+
+ kvStore = mock(KeyValueStore.class);
+ when(kvStore.get("k1")).thenReturn("v1");
+ when(kvStore.get("k2")).thenReturn("v2");
+ when(kvStore.getAll(keys)).thenReturn(values);
+
+ getNs = new Timer("");
+ getAllNs = new Timer("");
+ numGets = new Counter("");
+ numGetAlls = new Counter("");
+ getCallbackNs = new Timer("");
+ numMissedLookups = new Counter("");
+
+ metricsRegistry = mock(MetricsRegistry.class);
+ String groupName = LocalTable.class.getSimpleName();
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ ReadWriteTable table = createTable(false);
+ Assert.assertEquals("v1", table.get("k1"));
+ Assert.assertEquals("v2", table.getAsync("k2").get());
+ Assert.assertNull(table.get("k3"));
+ verify(kvStore, times(3)).get(any());
+ Assert.assertEquals(3, numGets.getCount());
+ Assert.assertEquals(1, numMissedLookups.getCount());
+ Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, numGetAlls.getCount());
+ Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ Assert.assertEquals(values, table.getAll(keys));
+ Assert.assertEquals(values, table.getAllAsync(keys).get());
+ verify(kvStore, times(2)).getAll(any());
+ Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
+ Assert.assertEquals(2, numMissedLookups.getCount());
+ Assert.assertEquals(3, numGetAlls.getCount());
+ Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, numGets.getCount());
+ Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testTimerDisabled() throws Exception {
+ ReadWriteTable table = createTable(true);
+ table.get("");
+ table.getAsync("").get();
+ table.getAll(keys);
+ table.getAllAsync(keys).get();
+ Assert.assertEquals(2, numGets.getCount());
+ Assert.assertEquals(4, numMissedLookups.getCount());
+ Assert.assertEquals(2, numGetAlls.getCount());
+ Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ private LocalTable createTable(boolean isTimerDisabled) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ LocalTable table = new LocalTable("t1", kvStore);
+ table.init(context);
+
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
new file mode 100644
index 0000000..80eb99f
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadWriteTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableWrite {
+
+ public static final String TABLE_ID = "t1";
+
+ private Timer putNs;
+ private Timer putAllNs;
+ private Timer deleteNs;
+ private Timer deleteAllNs;
+ private Timer flushNs;
+ private Counter numPuts;
+ private Counter numPutAlls;
+ private Counter numDeletes;
+ private Counter numDeleteAlls;
+ private Counter numFlushes;
+ private Timer putCallbackNs;
+ private Timer deleteCallbackNs;
+
+ private MetricsRegistry metricsRegistry;
+
+ private KeyValueStore kvStore;
+
+ @Before
+ public void setUp() {
+
+ putNs = new Timer("");
+ putAllNs = new Timer("");
+ deleteNs = new Timer("");
+ deleteAllNs = new Timer("");
+ flushNs = new Timer("");
+ numPuts = new Counter("");
+ numPutAlls = new Counter("");
+ numDeletes = new Counter("");
+ numDeleteAlls = new Counter("");
+ numFlushes = new Counter("");
+ putCallbackNs = new Timer("");
+ deleteCallbackNs = new Timer("");
+
+ metricsRegistry = mock(MetricsRegistry.class);
+ String groupName = LocalTable.class.getSimpleName();
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
+
+ kvStore = mock(KeyValueStore.class);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.put("k1", "v1");
+ table.putAsync("k2", "v2").get();
+ table.putAsync("k3", null).get();
+ verify(kvStore, times(2)).put(any(), any());
+ verify(kvStore, times(1)).delete(any());
+ Assert.assertEquals(2, numPuts.getCount());
+ Assert.assertEquals(1, numDeletes.getCount());
+ Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+ Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testPutAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
+ table.putAll(entries);
+ table.putAllAsync(entries).get();
+ verify(kvStore, times(2)).putAll(any());
+ verify(kvStore, times(2)).deleteAll(any());
+ Assert.assertEquals(2, numPutAlls.getCount());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.delete("");
+ table.deleteAsync("").get();
+ verify(kvStore, times(2)).delete(any());
+ Assert.assertEquals(2, numDeletes.getCount());
+ Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testDeleteAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.deleteAll(Collections.emptyList());
+ table.deleteAllAsync(Collections.emptyList()).get();
+ verify(kvStore, times(2)).deleteAll(any());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testFlush() {
+ ReadWriteTable table = createTable(false);
+ table.flush();
+ table.flush();
+ verify(kvStore, times(2)).flush();
+ Assert.assertEquals(2, numFlushes.getCount());
+ Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testTimerDisabled() throws Exception {
+ ReadWriteTable table = createTable(true);
+ table.put("", "");
+ table.putAsync("", "").get();
+ table.putAll(Collections.emptyList());
+ table.putAllAsync(Collections.emptyList()).get();
+ table.delete("");
+ table.deleteAsync("").get();
+ table.deleteAll(Collections.emptyList());
+ table.deleteAllAsync(Collections.emptyList()).get();
+ table.flush();
+ Assert.assertEquals(1, numFlushes.getCount());
+ Assert.assertEquals(2, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(2, numDeletes.getCount());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ private LocalTable createTable(boolean isTimerDisabled) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ LocalTable table = new LocalTable("t1", kvStore);
+ table.init(context);
+
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2137a46..ab650f2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -229,7 +229,7 @@ public class StreamTaskIntegrationTest {
@Override
public void init(Context context) throws Exception {
- profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+ profileViewTable = context.getTaskContext().getTable("profile-view-store");
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
deleted file mode 100644
index b447493..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.context.Context;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * This test class tests sendTo() and join() for local tables
- */
-public class TestLocalTable extends AbstractIntegrationTestHarness {
-
- @Test
- public void testSendTo() throws Exception {
-
- int count = 10;
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
- MyMapFunction mapFn = new MyMapFunction();
-
- final StreamApplication app = appDesc -> {
-
- Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
- KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
- appDesc.getInputStream(isd)
- .map(mapFn)
- .sendTo(table);
- };
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- for (int i = 0; i < partitionCount; i++) {
- MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
- assertEquals(count, mapFnCopy.received.size());
- mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
- }
- }
-
- static class StreamTableJoinApp implements StreamApplication {
- static List<PageView> received = new LinkedList<>();
- static List<EnrichedPageView> joined = new LinkedList<>();
-
- @Override
- public void describe(StreamApplicationDescriptor appDesc) {
- Table<KV<Integer, Profile>> table = appDesc.getTable(
- new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
- appDesc.getInputStream(profileISD)
- .map(m -> new KV(m.getMemberId(), m))
- .sendTo(table);
-
- GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDesc.getInputStream(pageViewISD)
- .map(pv -> {
- received.add(pv);
- return pv;
- })
- .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
- .join(table, new PageViewToProfileJoinFunction())
- .sink((m, collector, coordinator) -> joined.add(m));
- }
- }
-
- @Test
- public void testStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.samza.bootstrap", "true");
- configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
- assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
- assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
- }
-
- static class DualStreamTableJoinApp implements StreamApplication {
- static List<Profile> sentToProfileTable1 = new LinkedList<>();
- static List<Profile> sentToProfileTable2 = new LinkedList<>();
- static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
- static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
-
- @Override
- public void describe(StreamApplicationDescriptor appDesc) {
- KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
- KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
-
- PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
- PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
-
- Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
-
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
- GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
- MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
- MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
-
- profileStream1
- .map(m -> {
- sentToProfileTable1.add(m);
- return new KV(m.getMemberId(), m);
- })
- .sendTo(profileTable);
- profileStream2
- .map(m -> {
- sentToProfileTable2.add(m);
- return new KV(m.getMemberId(), m);
- })
- .sendTo(profileTable);
-
- GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
- GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
- MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
- MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
-
- pageViewStream1
- .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
- .join(profileTable, joinFn1)
- .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
-
- pageViewStream2
- .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
- .join(profileTable, joinFn2)
- .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
- }
- }
-
- @Test
- public void testDualStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.Profile1.samza.system", "test");
- configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile1.samza.bootstrap", "true");
- configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.Profile2.samza.system", "test");
- configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile2.samza.bootstrap", "true");
- configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.PageView1.samza.system", "test");
- configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.PageView2.samza.system", "test");
- configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
- assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
- assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
- assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
- assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
- assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
- }
-
- static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
- Map<String, String> configs = new HashMap<>();
- configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
-
- configs.put(JobConfig.JOB_NAME(), "test-table-job");
- configs.put(JobConfig.PROCESSOR_ID(), "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
- configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
-
- // For intermediate streams
- configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
- configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
- configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
- configs.put("systems.kafka.samza.key.serde", "int");
- configs.put("systems.kafka.samza.msg.serde", "json");
- configs.put("systems.kafka.default.stream.replication.factor", "1");
- configs.put("job.default.system", "kafka");
-
- configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
- configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
-
- return configs;
- }
-
- private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
- private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
-
- private transient List<Profile> received;
- private transient ReadableTable table;
-
- @Override
- public void init(Context context) {
- table = (ReadableTable) context.getTaskContext().getTable("t1");
- this.received = new ArrayList<>();
-
- taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
- }
-
- @Override
- public KV<Integer, Profile> apply(Profile profile) {
- received.add(profile);
- return new KV(profile.getMemberId(), profile);
- }
-
- public static MyMapFunction getMapFunctionByTask(String taskName) {
- return taskToMapFunctionMap.get(taskName);
- }
- }
-
- @Test
- public void testWithLowLevelApi() throws Exception {
-
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
- configs.put("streams.PageView.partitionCount", String.valueOf(4));
- configs.put("task.inputs", "test.PageView");
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
- executeRun(runner, config);
- runner.waitForFinish();
- }
-
- static public class MyTaskApplication implements TaskApplication {
- @Override
- public void describe(TaskApplicationDescriptor appDescriptor) {
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDescriptor
- .withInputStream(pageViewISD)
- .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
- .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
- }
- }
-
- static public class MyStreamTask implements StreamTask, InitableTask {
- private ReadWriteTable<Integer, PageView> pageViewTable;
- @Override
- public void init(Context context) throws Exception {
- pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1");
- }
- @Override
- public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
- PageView pv = (PageView) message.getMessage();
- pageViewTable.put(pv.getMemberId(), pv);
- PageView pv2 = pageViewTable.get(pv.getMemberId());
- Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
- Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
new file mode 100644
index 0000000..0303c26
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test class tests sendTo() and join() for local tables
+ */
+public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness {
+
+ @Test
+ public void testSendTo() throws Exception {
+
+ int count = 10;
+ Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.Profile.samza.system", "test");
+ configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+ MyMapFunction mapFn = new MyMapFunction();
+
+ final StreamApplication app = appDesc -> {
+
+ Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
+ KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+
+ appDesc.getInputStream(isd)
+ .map(mapFn)
+ .sendTo(table);
+ };
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ for (int i = 0; i < partitionCount; i++) {
+ MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
+ assertEquals(count, mapFnCopy.received.size());
+ mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+ }
+ }
+
+ static class StreamTableJoinApp implements StreamApplication {
+ static List<PageView> received = new LinkedList<>();
+ static List<EnrichedPageView> joined = new LinkedList<>();
+
+ @Override
+ public void describe(StreamApplicationDescriptor appDesc) {
+ Table<KV<Integer, Profile>> table = appDesc.getTable(
+ new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+ appDesc.getInputStream(profileISD)
+ .map(m -> new KV(m.getMemberId(), m))
+ .sendTo(table);
+
+ GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDesc.getInputStream(pageViewISD)
+ .map(pv -> {
+ received.add(pv);
+ return pv;
+ })
+ .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+ .join(table, new PageViewToProfileJoinFunction())
+ .sink((m, collector, coordinator) -> joined.add(m));
+ }
+ }
+
+ @Test
+ public void testStreamTableJoin() throws Exception {
+
+ int count = 10;
+ PageView[] pageViews = TestTableData.generatePageViews(count);
+ Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.Profile.samza.system", "test");
+ configs.put("streams.Profile.samza.bootstrap", "true");
+ configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
+ assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
+ assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+ }
+
+ static class DualStreamTableJoinApp implements StreamApplication {
+ static List<Profile> sentToProfileTable1 = new LinkedList<>();
+ static List<Profile> sentToProfileTable2 = new LinkedList<>();
+ static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+ static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+
+ @Override
+ public void describe(StreamApplicationDescriptor appDesc) {
+ KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
+ KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
+
+ PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
+ PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
+
+ Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
+ GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+ MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
+ MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
+
+ profileStream1
+ .map(m -> {
+ sentToProfileTable1.add(m);
+ return new KV(m.getMemberId(), m);
+ })
+ .sendTo(profileTable);
+ profileStream2
+ .map(m -> {
+ sentToProfileTable2.add(m);
+ return new KV(m.getMemberId(), m);
+ })
+ .sendTo(profileTable);
+
+ GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
+ GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+ MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
+ MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
+
+ pageViewStream1
+ .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+ .join(profileTable, joinFn1)
+ .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+ pageViewStream2
+ .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+ .join(profileTable, joinFn2)
+ .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+ }
+ }
+
+ @Test
+ public void testDualStreamTableJoin() throws Exception {
+
+ int count = 10;
+ PageView[] pageViews = TestTableData.generatePageViews(count);
+ Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.Profile1.samza.system", "test");
+ configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile1.samza.bootstrap", "true");
+ configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.Profile2.samza.system", "test");
+ configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile2.samza.bootstrap", "true");
+ configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.PageView1.samza.system", "test");
+ configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.PageView2.samza.system", "test");
+ configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
+
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
+ assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
+ assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+ }
+
+ static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
+
+ configs.put(JobConfig.JOB_NAME(), "test-table-job");
+ configs.put(JobConfig.PROCESSOR_ID(), "1");
+ configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+ // For intermediate streams
+ configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+ configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
+ configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
+ configs.put("systems.kafka.samza.key.serde", "int");
+ configs.put("systems.kafka.samza.msg.serde", "json");
+ configs.put("systems.kafka.default.stream.replication.factor", "1");
+ configs.put("job.default.system", "kafka");
+
+ configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
+ configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
+
+ return configs;
+ }
+
+ private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
+
+ private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+
+ private transient List<Profile> received;
+ private transient ReadWriteTable table;
+
+ @Override
+ public void init(Context context) {
+ table = context.getTaskContext().getTable("t1");
+ this.received = new ArrayList<>();
+
+ taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+ }
+
+ @Override
+ public KV<Integer, Profile> apply(Profile profile) {
+ received.add(profile);
+ return new KV(profile.getMemberId(), profile);
+ }
+
+ public static MyMapFunction getMapFunctionByTask(String taskName) {
+ return taskToMapFunctionMap.get(taskName);
+ }
+ }
+
+ @Test
+ public void testWithLowLevelApi() throws Exception {
+
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+ configs.put("streams.PageView.partitionCount", String.valueOf(4));
+ configs.put("task.inputs", "test.PageView");
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+ }
+
+ static public class MyTaskApplication implements TaskApplication {
+ @Override
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDescriptor
+ .withInputStream(pageViewISD)
+ .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
+ .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+ }
+ }
+
+ static public class MyStreamTask implements StreamTask, InitableTask {
+ private ReadWriteTable<Integer, PageView> pageViewTable;
+ @Override
+ public void init(Context context) throws Exception {
+ pageViewTable = context.getTaskContext().getTable("t1");
+ }
+ @Override
+ public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+ PageView pv = (PageView) message.getMessage();
+ pageViewTable.put(pv.getMemberId(), pv);
+ PageView pv2 = pageViewTable.get(pv.getMemberId());
+ Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
+ Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
deleted file mode 100644
index 3de8300..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import com.google.common.cache.CacheBuilder;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
-import org.apache.samza.table.remote.RemoteReadableTable;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.TableRateLimiter;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
-import org.apache.samza.util.RateLimiter;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.generatePageViews;
-import static org.apache.samza.test.table.TestTableData.generateProfiles;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.withSettings;
-
-
-public class TestRemoteTable extends AbstractIntegrationTestHarness {
-
- static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
-
- static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> {
- private final String serializedProfiles;
- private transient Map<Integer, Profile> profileMap;
-
- private InMemoryReadFunction(String profiles) {
- this.serializedProfiles = profiles;
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
- this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
- }
-
- @Override
- public CompletableFuture<Profile> getAsync(Integer key) {
- return CompletableFuture.completedFuture(profileMap.get(key));
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
-
- static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
- return new InMemoryReadFunction(serializedProfiles);
- }
- }
-
- static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> {
- private transient List<EnrichedPageView> records;
- private String testName;
-
- public InMemoryWriteFunction(String testName) {
- this.testName = testName;
- }
-
- // Verify serializable functionality
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- // Write to the global list for verification
- records = writtenRecords.get(testName);
- }
-
- @Override
- public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
- records.add(record);
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(Integer key) {
- records.remove(key);
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
- CachingTableDescriptor<K, V> cachingDesc;
- if (defaultCache) {
- cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
- cachingDesc.withReadTtl(Duration.ofMinutes(5));
- cachingDesc.withWriteTtl(Duration.ofMinutes(5));
- } else {
- GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
- guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
- cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
- }
-
- return appDesc.getTable(cachingDesc);
- }
-
- static class MyReadFunction implements TableReadFunction {
- @Override
- public CompletableFuture getAsync(Object key) {
- return null;
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
- final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
-
- writtenRecords.put(testName, new ArrayList<>());
-
- int count = 10;
- PageView[] pageViews = generatePageViews(count);
- String profiles = Base64Serializer.serialize(generateProfiles(count));
-
- int partitionCount = 4;
- Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
- final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
- final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
- final StreamApplication app = appDesc -> {
- RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
- inputTableDesc
- .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
- .withRateLimiter(readRateLimiter, null, null);
-
- // dummy reader
- TableReadFunction readFn = new MyReadFunction();
-
- RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
- outputTableDesc
- .withReadFunction(readFn)
- .withWriteFunction(writer)
- .withRateLimiter(writeRateLimiter, null, null);
-
- Table<KV<Integer, EnrichedPageView>> outputTable = withCache
- ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
- : appDesc.getTable(outputTableDesc);
-
- Table<KV<Integer, Profile>> inputTable = withCache
- ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
- : appDesc.getTable(inputTableDesc);
-
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDesc.getInputStream(isd)
- .map(pv -> new KV<>(pv.getMemberId(), pv))
- .join(inputTable, new PageViewToProfileJoinFunction())
- .map(m -> new KV(m.getMemberId(), m))
- .sendTo(outputTable);
- };
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- int numExpected = count * partitionCount;
- Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
- Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
- }
-
- @Test
- public void testStreamTableJoinRemoteTable() throws Exception {
- doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable");
- }
-
- @Test
- public void testStreamTableJoinRemoteTableWithCache() throws Exception {
- doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache");
- }
-
- @Test
- public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception {
- doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache");
- }
-
- private Context createMockContext() {
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
- doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
- doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
- Context context = new MockContext();
- doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
- doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
- return context;
- }
-
- @Test(expected = SamzaException.class)
- public void testCatchReaderException() {
- TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
- CompletableFuture<String> future = new CompletableFuture<>();
- future.completeExceptionally(new RuntimeException("Expected test exception"));
- doReturn(future).when(reader).getAsync(anyString());
- TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
- RemoteReadableTable<String, ?> table = new RemoteReadableTable<>(
- "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
- table.init(createMockContext());
- table.get("abc");
- }
-
- @Test(expected = SamzaException.class)
- public void testCatchWriterException() {
- TableReadFunction<String, String> reader = mock(TableReadFunction.class);
- TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
- CompletableFuture<String> future = new CompletableFuture<>();
- future.completeExceptionally(new RuntimeException("Expected test exception"));
- doReturn(future).when(writer).putAsync(anyString(), any());
- TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
- RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>(
- "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
- table.init(createMockContext());
- table.put("abc", "efg");
- }
-}