You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/11/27 18:40:43 UTC
[1/2] samza git commit: SAMZA-2004: Add ability to disable table
metrics
Repository: samza
Updated Branches:
refs/heads/master e25e0dab9 -> 5069f1ddb
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
new file mode 100644
index 0000000..1f2d586
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadableTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalReadableTable {
+
+ public static final String TABLE_ID = "t1";
+
+ private List<String> keys;
+ private Map<String, String> values;
+
+ private Timer getNs;
+ private Timer getAllNs;
+ private Counter numGets;
+ private Counter numGetAlls;
+ private Timer getCallbackNs;
+ private Counter numMissedLookups;
+
+ private MetricsRegistry metricsRegistry;
+
+ private KeyValueStore kvStore;
+
+ @Before
+ public void setUp() {
+ keys = Arrays.asList("k1", "k2", "k3");
+
+ values = new HashMap<>();
+ values.put("k1", "v1");
+ values.put("k2", "v2");
+ values.put("k3", null);
+
+ kvStore = mock(KeyValueStore.class);
+ when(kvStore.get("k1")).thenReturn("v1");
+ when(kvStore.get("k2")).thenReturn("v2");
+ when(kvStore.getAll(keys)).thenReturn(values);
+
+ getNs = new Timer("");
+ getAllNs = new Timer("");
+ numGets = new Counter("");
+ numGetAlls = new Counter("");
+ getCallbackNs = new Timer("");
+ numMissedLookups = new Counter("");
+
+ metricsRegistry = mock(MetricsRegistry.class);
+ String groupName = LocalReadableTable.class.getSimpleName();
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ ReadableTable table = createTable(false);
+ Assert.assertEquals("v1", table.get("k1"));
+ Assert.assertEquals("v2", table.getAsync("k2").get());
+ Assert.assertNull(table.get("k3"));
+ verify(kvStore, times(3)).get(any());
+ Assert.assertEquals(3, numGets.getCount());
+ Assert.assertEquals(1, numMissedLookups.getCount());
+ Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, numGetAlls.getCount());
+ Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ ReadableTable table = createTable(false);
+ Assert.assertEquals(values, table.getAll(keys));
+ Assert.assertEquals(values, table.getAllAsync(keys).get());
+ verify(kvStore, times(2)).getAll(any());
+ Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
+ Assert.assertEquals(2, numMissedLookups.getCount());
+ Assert.assertEquals(3, numGetAlls.getCount());
+ Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, numGets.getCount());
+ Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testTimerDisabled() throws Exception {
+ ReadableTable table = createTable(true);
+ table.get("");
+ table.getAsync("").get();
+ table.getAll(Collections.emptyList());
+ table.getAllAsync(Collections.emptyList()).get();
+ verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
+ verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
+ verify(metricsRegistry, times(0)).newGauge(anyString(), any());
+ Assert.assertEquals(2, numGets.getCount());
+ Assert.assertEquals(2, numMissedLookups.getCount());
+ Assert.assertEquals(2, numGetAlls.getCount());
+ Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ private LocalReadableTable createTable(boolean isTimerDisabled) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ LocalReadableTable table = new LocalReadableTable("t1", kvStore);
+ table.init(context);
+
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
new file mode 100644
index 0000000..5367931
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import junit.framework.Assert;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.context.TaskContext;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableProvider {
+
+ @Test
+ public void testInit() {
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig());
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+ TaskContext taskContext = mock(TaskContext.class);
+ when(context.getTaskContext()).thenReturn(taskContext);
+ when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+
+ TableProvider tableProvider = createTableProvider("t1");
+ tableProvider.init(context);
+ Assert.assertNotNull(tableProvider.getTable());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testInitFail() {
+ TableProvider tableProvider = createTableProvider("t1");
+ Assert.assertNotNull(tableProvider.getTable());
+ }
+
+ private TableProvider createTableProvider(String tableId) {
+ return new LocalTableProvider(tableId) {
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
deleted file mode 100644
index 752b91e..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv.descriptors;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.storage.kv.LocalTableProvider;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Test;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalTableProvider {
-
- @Test
- public void testInit() {
- Context context = mock(Context.class);
- TaskContext taskContext = mock(TaskContext.class);
- when(context.getTaskContext()).thenReturn(taskContext);
- when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
- when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
-
- TableProvider tableProvider = createTableProvider("t1");
- tableProvider.init(context);
- Assert.assertNotNull(tableProvider.getTable());
- }
-
- @Test(expected = NullPointerException.class)
- public void testInitFail() {
- TableProvider tableProvider = createTableProvider("t1");
- Assert.assertNotNull(tableProvider.getTable());
- }
-
- private TableProvider createTableProvider(String tableId) {
- return new LocalTableProvider(tableId) {
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 80cb789..89a32d8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,11 +21,9 @@ package org.apache.samza.sql.impl;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
-import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index fa279f2..e112804 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -20,11 +20,11 @@
package org.apache.samza.test.table;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
@@ -34,12 +34,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -49,9 +44,6 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.storage.kv.LocalReadWriteTable;
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.table.ReadWriteTable;
@@ -65,6 +57,7 @@ import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.apache.samza.test.util.ArraySystemFactory;
import org.apache.samza.test.util.Base64Serializer;
+
import org.junit.Assert;
import org.junit.Test;
@@ -74,16 +67,9 @@ import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
import static org.apache.samza.test.table.TestTableData.Profile;
import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
/**
@@ -357,50 +343,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
}
@Test
- public void testAsyncOperation() throws Exception {
- KeyValueStore kvStore = mock(KeyValueStore.class);
- LocalReadWriteTable<String, String> table = new LocalReadWriteTable<>("table1", kvStore);
- Context context = mock(Context.class);
- TaskContext taskContext = mock(TaskContext.class);
- when(context.getTaskContext()).thenReturn(taskContext);
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
- doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
- doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
- doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
- doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry();
-
- table.init(context);
-
- // GET
- doReturn("bar").when(kvStore).get(anyString());
- Assert.assertEquals("bar", table.getAsync("foo").get());
-
- // GET-ALL
- Map<String, String> recordMap = new HashMap<>();
- recordMap.put("foo1", "bar1");
- recordMap.put("foo2", "bar2");
- doReturn(recordMap).when(kvStore).getAll(anyList());
- Assert.assertEquals(recordMap, table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
-
- // PUT
- table.putAsync("foo1", "bar1").get();
- verify(kvStore, times(1)).put(anyString(), anyString());
-
- // PUT-ALL
- List<Entry<String, String>> records = Arrays.asList(new Entry<>("foo1", "bar1"), new Entry<>("foo2", "bar2"));
- table.putAllAsync(records).get();
- verify(kvStore, times(1)).putAll(anyList());
-
- // DELETE
- table.deleteAsync("foo").get();
- verify(kvStore, times(1)).delete(anyString());
-
- // DELETE-ALL
- table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
- verify(kvStore, times(1)).deleteAll(anyList());
- }
-
- @Test
public void testWithLowLevelApi() throws Exception {
Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index 8218b8b..c9228af 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -20,6 +20,7 @@
package org.apache.samza.test.table;
import com.google.common.cache.CacheBuilder;
+
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
@@ -33,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -60,6 +62,7 @@ import org.apache.samza.table.remote.TableWriteFunction;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.apache.samza.test.util.Base64Serializer;
import org.apache.samza.util.RateLimiter;
+
import org.junit.Assert;
import org.junit.Test;
@@ -68,6 +71,7 @@ import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.Profile;
import static org.apache.samza.test.table.TestTableData.generatePageViews;
import static org.apache.samza.test.table.TestTableData.generateProfiles;
+
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -248,7 +252,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
Context context = new MockContext();
- doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+ doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
+ doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
return context;
}
[2/2] samza git commit: SAMZA-2004: Add ability to disable table
metrics
Posted by we...@apache.org.
SAMZA-2004: Add ability to disable table metrics
For jobs with very high throughput, it is desirable to disable metrics on tables. We would introduce the option to disable all metrics for a table on table descriptor.
Author: Wei Song <ws...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #822 from weisong44/SAMZA-2004-2
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5069f1dd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5069f1dd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5069f1dd
Branch: refs/heads/master
Commit: 5069f1ddb74587ad6fb11d515bece45bd2a8f3bc
Parents: e25e0da
Author: Wei Song <ws...@linkedin.com>
Authored: Tue Nov 27 10:40:34 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Tue Nov 27 10:40:34 2018 -0800
----------------------------------------------------------------------
.../documentation/versioned/api/table-api.md | 1 +
.../apache/samza/config/JavaTableConfig.java | 2 +-
.../table/descriptors/BaseTableDescriptor.java | 13 +-
.../table/descriptors/LocalTableDescriptor.java | 3 -
.../table/descriptors/TableDescriptor.java | 9 -
.../samza/table/remote/TableRateLimiter.java | 4 +-
.../apache/samza/table/BaseReadableTable.java | 66 +++++
.../apache/samza/table/BaseTableProvider.java | 4 +-
.../samza/table/caching/CachingTable.java | 48 ++--
.../table/caching/guava/GuavaCacheTable.java | 9 +-
.../table/remote/RemoteReadWriteTable.java | 82 +++++--
.../samza/table/remote/RemoteReadableTable.java | 145 +++--------
.../table/utils/DefaultTableReadMetrics.java | 56 -----
.../table/utils/DefaultTableWriteMetrics.java | 64 -----
.../samza/table/utils/TableMetricsUtil.java | 14 +-
.../samza/table/utils/TableReadMetrics.java | 65 +++++
.../samza/table/utils/TableWriteMetrics.java | 77 ++++++
.../samza/config/TestJavaTableConfig.java | 23 +-
.../samza/table/caching/TestCachingTable.java | 90 ++++++-
.../table/remote/TestRemoteReadWriteTable.java | 2 +-
.../descriptors/TestRemoteTableDescriptor.java | 12 +-
.../samza/storage/kv/LocalReadWriteTable.java | 49 ++--
.../samza/storage/kv/LocalReadableTable.java | 46 ++--
.../storage/kv/TestLocalReadWriteTable.java | 244 +++++++++++++++++++
.../storage/kv/TestLocalReadableTable.java | 158 ++++++++++++
.../storage/kv/TestLocalTableProvider.java | 66 +++++
.../kv/descriptors/TestLocalTableProvider.java | 60 -----
.../sql/impl/ConfigBasedIOResolverFactory.java | 2 -
.../apache/samza/test/table/TestLocalTable.java | 64 +----
.../samza/test/table/TestRemoteTable.java | 7 +-
30 files changed, 995 insertions(+), 490 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/docs/learn/documentation/versioned/api/table-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md
index 0a9c33c..f53d9f5 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -248,6 +248,7 @@ The table below summarizes table metrics:
|`getAll-ns`|`ReadableTable`|Average latency of `getAll/getAllAsync()` operations|
|`num-gets`|`ReadableTable`|Count of `get/getAsync()` operations
|`num-getAlls`|`ReadableTable`|Count of `getAll/getAllAsync()` operations
+|`num-missed-lookups`|`ReadableTable`|Count of missed get/getAll() operations
|`put-ns`|`ReadWriteTable`|Average latency of `put/putAsync()` operations
|`putAll-ns`|`ReadWriteTable`|Average latency of `putAll/putAllAsync()` operations
|`num-puts`|`ReadWriteTable`|Count of `put/putAsync()` operations
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
index c4381a1..ee0045a 100644
--- a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -81,7 +81,7 @@ public class JavaTableConfig extends MapConfig {
* @param tableId Id of the table
* @return serde retistry key
*/
- public String getValueSerde(String tableId) {
+ public String getMsgSerde(String tableId) {
return get(String.format(STORE_MSG_SERDE, tableId), null);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index d660276..26c2ae3 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -19,11 +19,11 @@
package org.apache.samza.table.descriptors;
+import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
@@ -34,7 +34,6 @@ import org.apache.samza.config.JavaTableConfig;
* @param <V> the type of the value in this table
* @param <D> the type of the concrete table descriptor
*/
-@InterfaceStability.Unstable
abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
implements TableDescriptor<K, V, D> {
@@ -50,7 +49,13 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
this.tableId = tableId;
}
- @Override
+ /**
+ * Add a configuration entry for the table
+ *
+ * @param key the key
+ * @param value the value
+ * @return this table descriptor instance
+ */
public D withConfig(String key, String value) {
config.put(key, value);
return (D) this;
@@ -64,6 +69,8 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
@Override
public Map<String, String> toConfig(Config jobConfig) {
+ Preconditions.checkNotNull(jobConfig, "Job config is null");
+
validate();
Map<String, String> tableConfig = new HashMap<>(config);
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index d194091..1ebb580 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -28,7 +28,6 @@ import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.storage.SideInputsProcessor;
import org.apache.samza.table.utils.SerdeUtils;
@@ -143,8 +142,6 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
- JavaTableConfig javaTableConfig = new JavaTableConfig(jobConfig);
-
if (sideInputs != null && !sideInputs.isEmpty()) {
sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format(
"Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN)));
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
index 806d158..81a98e6 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
@@ -67,15 +67,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
String getTableId();
/**
- * Add a configuration entry for the table
- *
- * @param key the key
- * @param value the value
- * @return this table descriptor instance
- */
- D withConfig(String key, String value);
-
- /**
* Generate configuration for this table descriptor, the generated configuration
* should be the complete configuration for this table that can be directly
* included in the job configuration.
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
index c67a648..37d6385 100644
--- a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
@@ -122,7 +122,9 @@ public class TableRateLimiter<K, V> {
long startNs = System.nanoTime();
rateLimiter.acquire(Collections.singletonMap(tag, credits));
- waitTimeMetric.update(System.nanoTime() - startNs);
+ if (waitTimeMetric != null) {
+ waitTimeMetric.update(System.nanoTime() - startNs);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
new file mode 100644
index 0000000..7eaaa83
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.utils.TableReadMetrics;
+import org.apache.samza.table.utils.TableWriteMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for all readable tables
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
+
+ protected final Logger logger;
+
+ protected final String tableId;
+
+ protected TableReadMetrics readMetrics;
+ protected TableWriteMetrics writeMetrics;
+
+ /**
+ * Construct an instance
+ * @param tableId Id of the table
+ */
+ public BaseReadableTable(String tableId) {
+ Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
+ String.format("Invalid table Id: %s", tableId));
+ this.tableId = tableId;
+ this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId);
+ }
+
+ @Override
+ public void init(Context context) {
+ readMetrics = new TableReadMetrics(context, this, tableId);
+ if (this instanceof ReadWriteTable) {
+ writeMetrics = new TableWriteMetrics(context, this, tableId);
+ }
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
index 7ad423d..f2c0b08 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java
@@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory;
*/
abstract public class BaseTableProvider implements TableProvider {
- final protected Logger logger = LoggerFactory.getLogger(getClass());
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
- final protected String tableId;
+ protected final String tableId;
protected Context context;
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index 703a6ff..ac6188b 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -23,10 +23,9 @@ import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadableTable;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.utils.DefaultTableReadMetrics;
-import org.apache.samza.table.utils.DefaultTableWriteMetrics;
import org.apache.samza.table.utils.TableMetricsUtil;
import java.util.ArrayList;
@@ -37,6 +36,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
/**
* A composite table incorporating a cache with a Samza table. The cache is
@@ -62,23 +64,20 @@ import java.util.stream.Collectors;
* @param <K> type of the table key
* @param <V> type of the table value
*/
-public class CachingTable<K, V> implements ReadWriteTable<K, V> {
- private final String tableId;
+public class CachingTable<K, V> extends BaseReadableTable<K, V>
+ implements ReadWriteTable<K, V> {
+
private final ReadableTable<K, V> rdTable;
private final ReadWriteTable<K, V> rwTable;
private final ReadWriteTable<K, V> cache;
private final boolean isWriteAround;
- // Metrics
- private DefaultTableReadMetrics readMetrics;
- private DefaultTableWriteMetrics writeMetrics;
-
// Common caching stats
private AtomicLong hitCount = new AtomicLong();
private AtomicLong missCount = new AtomicLong();
public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
- this.tableId = tableId;
+ super(tableId);
this.rdTable = table;
this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null;
this.cache = cache;
@@ -87,8 +86,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public void init(Context context) {
- readMetrics = new DefaultTableReadMetrics(context, this, tableId);
- writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
+ super.init(context);
TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
tableMetricsUtil.newGauge("hit-rate", () -> hitRate());
tableMetricsUtil.newGauge("miss-rate", () -> missRate());
@@ -125,7 +123,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public CompletableFuture<V> getAsync(K key) {
- readMetrics.numGets.inc();
+ incCounter(readMetrics.numGets);
V value = cache.get(key);
if (value != null) {
hitCount.incrementAndGet();
@@ -142,7 +140,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
if (result != null) {
cache.put(key, result);
}
- readMetrics.getNs.update(System.nanoTime() - startNs);
+ updateTimer(readMetrics.getNs, System.nanoTime() - startNs);
return result;
}
});
@@ -161,7 +159,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
- readMetrics.numGetAlls.inc();
+ incCounter(readMetrics.numGetAlls);
// Make a copy of entries which might be immutable
Map<K, V> getAllResult = new HashMap<>();
List<K> missingKeys = lookupCache(keys, getAllResult);
@@ -181,7 +179,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
.collect(Collectors.toList()));
getAllResult.putAll(records);
}
- readMetrics.getAllNs.update(System.nanoTime() - startNs);
+ updateTimer(readMetrics.getAllNs, System.nanoTime() - startNs);
return getAllResult;
}
});
@@ -200,7 +198,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public CompletableFuture<Void> putAsync(K key, V value) {
- writeMetrics.numPuts.inc();
+ incCounter(writeMetrics.numPuts);
Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
long startNs = System.nanoTime();
@@ -214,7 +212,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
cache.put(key, value);
}
}
- writeMetrics.putNs.update(System.nanoTime() - startNs);
+ updateTimer(writeMetrics.putNs, System.nanoTime() - startNs);
return result;
});
}
@@ -232,7 +230,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
- writeMetrics.numPutAlls.inc();
+ incCounter(writeMetrics.numPutAlls);
long startNs = System.nanoTime();
Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
return rwTable.putAllAsync(records).handle((result, e) -> {
@@ -242,7 +240,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
cache.putAll(records);
}
- writeMetrics.putAllNs.update(System.nanoTime() - startNs);
+ updateTimer(writeMetrics.putAllNs, System.nanoTime() - startNs);
return result;
});
}
@@ -260,7 +258,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public CompletableFuture<Void> deleteAsync(K key) {
- writeMetrics.numDeletes.inc();
+ incCounter(writeMetrics.numDeletes);
long startNs = System.nanoTime();
Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
return rwTable.deleteAsync(key).handle((result, e) -> {
@@ -269,7 +267,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
} else if (!isWriteAround) {
cache.delete(key);
}
- writeMetrics.deleteNs.update(System.nanoTime() - startNs);
+ updateTimer(writeMetrics.deleteNs, System.nanoTime() - startNs);
return result;
});
}
@@ -285,7 +283,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
@Override
public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
- writeMetrics.numDeleteAlls.inc();
+ incCounter(writeMetrics.numDeleteAlls);
long startNs = System.nanoTime();
Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
return rwTable.deleteAllAsync(keys).handle((result, e) -> {
@@ -294,18 +292,18 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
} else if (!isWriteAround) {
cache.deleteAll(keys);
}
- writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
+ updateTimer(writeMetrics.deleteAllNs, System.nanoTime() - startNs);
return result;
});
}
@Override
public synchronized void flush() {
- writeMetrics.numFlushes.inc();
+ incCounter(writeMetrics.numFlushes);
long startNs = System.nanoTime();
Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
rwTable.flush();
- writeMetrics.flushNs.update(System.nanoTime() - startNs);
+ updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index 391f068..b75a0bc 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -23,6 +23,7 @@ import com.google.common.cache.Cache;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadableTable;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
@@ -39,17 +40,19 @@ import java.util.concurrent.CompletableFuture;
* @param <K> type of the key in the cache
* @param <V> type of the value in the cache
*/
-public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
- private final String tableId;
+public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V>
+ implements ReadWriteTable<K, V> {
+
private final Cache<K, V> cache;
public GuavaCacheTable(String tableId, Cache<K, V> cache) {
- this.tableId = tableId;
+ super(tableId);
this.cache = cache;
}
@Override
public void init(Context context) {
+ super.init(context);
TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
// hit- and miss-rate are provided by CachingTable.
tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount());
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 60ac4b7..b96087b 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -21,11 +21,16 @@ package org.apache.samza.table.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.DefaultTableWriteMetrics;
import org.apache.samza.table.utils.TableMetricsUtil;
import java.util.List;
@@ -33,6 +38,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
/**
* Remote store backed read writable table
@@ -40,9 +48,8 @@ import java.util.stream.Collectors;
* @param <K> the type of the key in this table
* @param <V> the type of the value in this table
*/
-public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
-
- private DefaultTableWriteMetrics writeMetrics;
+public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
+ implements ReadWriteTable<K, V> {
protected final TableWriteFunction<K, V> writeFn;
protected final TableRateLimiter writeRateLimiter;
@@ -59,9 +66,11 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
@Override
public void init(Context context) {
super.init(context);
- writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
- writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+ MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+ if (metricsConfig.getMetricsTimerEnabled()) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+ writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+ }
}
@Override
@@ -80,8 +89,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
return deleteAsync(key);
}
- writeMetrics.numPuts.inc();
- return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.putNs)
+ return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs)
.exceptionally(e -> {
throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
});
@@ -103,8 +111,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
return CompletableFuture.completedFuture(null);
}
- writeMetrics.numPutAlls.inc();
-
List<K> deleteKeys = records.stream()
.filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
@@ -117,7 +123,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
// Return the combined future
return CompletableFuture.allOf(
deleteFuture,
- executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.putAllNs))
+ executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs))
.exceptionally(e -> {
String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
@@ -136,8 +142,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
@Override
public CompletableFuture<Void> deleteAsync(K key) {
Preconditions.checkNotNull(key);
- writeMetrics.numDeletes.inc();
- return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.deleteNs)
+ return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs)
.exceptionally(e -> {
throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
});
@@ -159,8 +164,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
return CompletableFuture.completedFuture(null);
}
- writeMetrics.numDeleteAlls.inc();
- return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.deleteAllNs)
+ return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs)
.exceptionally(e -> {
throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
});
@@ -169,10 +173,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
@Override
public void flush() {
try {
- writeMetrics.numFlushes.inc();
+ incCounter(writeMetrics.numFlushes);
long startNs = System.nanoTime();
writeFn.flush();
- writeMetrics.flushNs.update(System.nanoTime() - startNs);
+ updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
} catch (Exception e) {
String errMsg = "Failed to flush remote store";
logger.error(errMsg, e);
@@ -186,6 +190,48 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
super.close();
}
+ /**
+ * Execute an async request given a table record (key+value)
+ * @param rateLimiter helper for rate limiting
+ * @param key key of the table record
+ * @param value value of the table record
+ * @param method method to be executed
+ * @param timer latency metric to be updated
+ * @return CompletableFuture of the operation
+ */
+ protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
+ K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
+ incCounter(counter);
+ final long startNs = System.nanoTime();
+ CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
+ .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
+ .thenCompose((r) -> method.apply(key, value))
+ : method.apply(key, value);
+ return completeExecution(ioFuture, startNs, timer);
+ }
+
+ /**
+ * Execute an async request given a collection of table records
+ * @param rateLimiter helper for rate limiting
+ * @param records list of records
+ * @param method method to be executed
+ * @param timer latency metric to be updated
+ * @return CompletableFuture of the operation
+ */
+ protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
+ Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
+ Counter counter, Timer timer) {
+ incCounter(counter);
+ final long startNs = System.nanoTime();
+ CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
+ .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
+ .thenCompose((r) -> method.apply(records))
+ : method.apply(records);
+ return completeExecution(ioFuture, startNs, timer);
+ }
+
@VisibleForTesting
public TableWriteFunction<K, V> getWriteFn() {
return writeFn;
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index 1b6bfea..e02650e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -23,14 +23,12 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Objects;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.utils.DefaultTableReadMetrics;
+import org.apache.samza.table.BaseReadableTable;
import org.apache.samza.table.utils.TableMetricsUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -38,9 +36,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
import java.util.function.Function;
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
/**
* A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
@@ -71,18 +70,13 @@ import java.util.function.Function;
* @param <K> the type of the key in this table
* @param <V> the type of the value in this table
*/
-public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
-
- protected final String tableId;
- protected final Logger logger;
+public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
protected final ExecutorService callbackExecutor;
protected final ExecutorService tableExecutor;
protected final TableReadFunction<K, V> readFn;
protected final TableRateLimiter<K, V> readRateLimiter;
- private DefaultTableReadMetrics readMetrics;
-
/**
* Construct a RemoteReadableTable instance
* @param tableId table id
@@ -93,21 +87,22 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
*/
public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn,
TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) {
- Preconditions.checkArgument(tableId != null && !tableId.isEmpty(), "invalid table id");
+ super(tableId);
Preconditions.checkNotNull(readFn, "null read function");
- this.tableId = tableId;
this.readFn = readFn;
this.readRateLimiter = rateLimiter;
this.callbackExecutor = callbackExecutor;
this.tableExecutor = tableExecutor;
- this.logger = LoggerFactory.getLogger(getClass().getName() + "-" + tableId);
}
@Override
public void init(Context context) {
- readMetrics = new DefaultTableReadMetrics(context, this, tableId);
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
- readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+ super.init(context);
+ MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+ if (metricsConfig.getMetricsTimerEnabled()) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+ readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+ }
}
@Override
@@ -122,14 +117,13 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
@Override
public CompletableFuture<V> getAsync(K key) {
Preconditions.checkNotNull(key);
- readMetrics.numGets.inc();
- return execute(readRateLimiter, key, readFn::getAsync, readMetrics.getNs)
+ return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs)
.handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to get the records for " + key, e);
}
if (result == null) {
- readMetrics.numMissedLookups.inc();
+ incCounter(readMetrics.numMissedLookups);
}
return result;
});
@@ -137,7 +131,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
@Override
public Map<K, V> getAll(List<K> keys) {
- readMetrics.numGetAlls.inc();
try {
return getAllAsync(keys).get();
} catch (Exception e) {
@@ -151,13 +144,12 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
}
- readMetrics.numGetAlls.inc();
- return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.getAllNs)
+ return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs)
.handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to get the records for " + keys, e);
}
- result.values().stream().filter(Objects::isNull).map(v -> readMetrics.numMissedLookups.inc());
+ result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
return result;
});
}
@@ -172,56 +164,15 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
* @return CompletableFuture of the operation
*/
protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
- K key, Function<K, CompletableFuture<T>> method, Timer timer) {
+ K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
+ incCounter(counter);
final long startNs = System.nanoTime();
- CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ?
- CompletableFuture
+ CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
.runAsync(() -> rateLimiter.throttle(key), tableExecutor)
- .thenCompose((r) -> method.apply(key)) :
- method.apply(key);
- if (callbackExecutor != null) {
- ioFuture.thenApplyAsync(r -> {
- timer.update(System.nanoTime() - startNs);
- return r;
- }, callbackExecutor);
- } else {
- ioFuture.thenApply(r -> {
- timer.update(System.nanoTime() - startNs);
- return r;
- });
- }
- return ioFuture;
- }
-
- /**
- * Execute an async request given a table record (key+value)
- * @param rateLimiter helper for rate limiting
- * @param key key of the table record
- * @param value value of the table record
- * @param method method to be executed
- * @param timer latency metric to be updated
- * @return CompletableFuture of the operation
- */
- protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
- K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Timer timer) {
- final long startNs = System.nanoTime();
- CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() ?
- CompletableFuture
- .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
- .thenCompose((r) -> method.apply(key, value)) :
- method.apply(key, value);
- if (callbackExecutor != null) {
- ioFuture.thenApplyAsync(r -> {
- timer.update(System.nanoTime() - startNs);
- return r;
- }, callbackExecutor);
- } else {
- ioFuture.thenApply(r -> {
- timer.update(System.nanoTime() - startNs);
- return r;
- });
- }
- return ioFuture;
+ .thenCompose((r) -> method.apply(key))
+ : method.apply(key);
+ return completeExecution(ioFuture, startNs, timer);
}
/**
@@ -234,54 +185,34 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
* @return CompletableFuture of the operation
*/
protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
- Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Timer timer) {
+ Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
+ incCounter(counter);
final long startNs = System.nanoTime();
- CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ?
- CompletableFuture
+ CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
.runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
- .thenCompose((r) -> method.apply(keys)) :
- method.apply(keys);
- if (callbackExecutor != null) {
- ioFuture.thenApplyAsync(r -> {
- timer.update(System.nanoTime() - startNs);
- return r;
- }, callbackExecutor);
- } else {
- ioFuture.thenApply(r -> {
- timer.update(System.nanoTime() - startNs);
- return r;
- });
- }
- return ioFuture;
+ .thenCompose((r) -> method.apply(keys))
+ : method.apply(keys);
+ return completeExecution(ioFuture, startNs, timer);
}
/**
- * Execute an async request given a collection of table records
- * @param rateLimiter helper for rate limiting
- * @param records list of records
- * @param method method to be executed
+ * Complete the pending execution and update timer
+ * @param ioFuture the future to be executed
+ * @param startNs start time in nanosecond
* @param timer latency metric to be updated
+ * @param <T> return type
* @return CompletableFuture of the operation
*/
- protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
- Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, Timer timer) {
- final long startNs = System.nanoTime();
- CompletableFuture<Void> ioFuture;
- if (rateLimiter.isRateLimited()) {
- ioFuture = CompletableFuture
- .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
- .thenCompose((r) -> method.apply(records));
- } else {
- ioFuture = method.apply(records);
- }
+ protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
if (callbackExecutor != null) {
ioFuture.thenApplyAsync(r -> {
- timer.update(System.nanoTime() - startNs);
+ updateTimer(timer, System.nanoTime() - startNs);
return r;
}, callbackExecutor);
} else {
ioFuture.thenApply(r -> {
- timer.update(System.nanoTime() - startNs);
+ updateTimer(timer, System.nanoTime() - startNs);
return r;
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
deleted file mode 100644
index 525a0cb..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class DefaultTableReadMetrics {
-
- public final Timer getNs;
- public final Timer getAllNs;
- public final Counter numGets;
- public final Counter numGetAlls;
- public final Timer getCallbackNs;
- public final Counter numMissedLookups;
-
- /**
- * Constructor based on container and task container context
- *
- * @param context {@link Context} for this task
- * @param table underlying table
- * @param tableId table Id
- */
- public DefaultTableReadMetrics(Context context, Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
- getNs = tableMetricsUtil.newTimer("get-ns");
- getAllNs = tableMetricsUtil.newTimer("getAll-ns");
- numGets = tableMetricsUtil.newCounter("num-gets");
- numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
- getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns");
- numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
deleted file mode 100644
index 69d4ef2..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-public class DefaultTableWriteMetrics {
-
- public final Timer putNs;
- public final Timer putAllNs;
- public final Timer deleteNs;
- public final Timer deleteAllNs;
- public final Timer flushNs;
- public final Counter numPuts;
- public final Counter numPutAlls;
- public final Counter numDeletes;
- public final Counter numDeleteAlls;
- public final Counter numFlushes;
- public final Timer putCallbackNs;
- public final Timer deleteCallbackNs;
-
- /**
- * Utility class that contains the default set of write metrics.
- *
- * @param context {@link Context} for this task
- * @param table underlying table
- * @param tableId table Id
- */
- public DefaultTableWriteMetrics(Context context, Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
- putNs = tableMetricsUtil.newTimer("put-ns");
- putAllNs = tableMetricsUtil.newTimer("putAll-ns");
- deleteNs = tableMetricsUtil.newTimer("delete-ns");
- deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
- flushNs = tableMetricsUtil.newTimer("flush-ns");
- numPuts = tableMetricsUtil.newCounter("num-puts");
- numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
- numDeletes = tableMetricsUtil.newCounter("num-deletes");
- numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
- numFlushes = tableMetricsUtil.newCounter("num-flushes");
- putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns");
- deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
index 1b19272..90d6fe8 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
@@ -53,7 +53,7 @@ public class TableMetricsUtil {
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(tableId);
- this.metricsRegistry = context.getTaskContext().getTaskMetricsRegistry();
+ this.metricsRegistry = context.getContainerContext().getContainerMetricsRegistry();
this.groupName = table.getClass().getSimpleName();
this.tableId = tableId;
}
@@ -87,6 +87,18 @@ public class TableMetricsUtil {
return metricsRegistry.newGauge(groupName, new SupplierGauge(getMetricFullName(name), supplier));
}
+ public static void incCounter(Counter counter) {
+ if (counter != null) {
+ counter.inc();
+ }
+ }
+
+ public static void updateTimer(Timer timer, long duration) {
+ if (timer != null) {
+ timer.update(duration);
+ }
+ }
+
private String getMetricFullName(String name) {
return String.format("%s-%s", tableId, name);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
new file mode 100644
index 0000000..0775844
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableReadMetrics {
+
+ public final Timer getNs;
+ public final Timer getAllNs;
+ public final Counter numGets;
+ public final Counter numGetAlls;
+ public final Timer getCallbackNs;
+ public final Counter numMissedLookups;
+
+ /**
+ * Constructor based on container and task container context
+ *
+ * @param context {@link Context} for this task
+ * @param table underlying table
+ * @param tableId table Id
+ */
+ public TableReadMetrics(Context context, Table table, String tableId) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+ numGets = tableMetricsUtil.newCounter("num-gets");
+ numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+ numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+
+ MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+ if (metricsConfig.getMetricsTimerEnabled()) {
+ getNs = tableMetricsUtil.newTimer("get-ns");
+ getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+ getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns");
+ } else {
+ getNs = null;
+ getAllNs = null;
+ getCallbackNs = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
new file mode 100644
index 0000000..02af35f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+public class TableWriteMetrics {
+
+ public final Timer putNs;
+ public final Timer putAllNs;
+ public final Timer deleteNs;
+ public final Timer deleteAllNs;
+ public final Timer flushNs;
+ public final Counter numPuts;
+ public final Counter numPutAlls;
+ public final Counter numDeletes;
+ public final Counter numDeleteAlls;
+ public final Counter numFlushes;
+ public final Timer putCallbackNs;
+ public final Timer deleteCallbackNs;
+
+ /**
+ * Utility class that contains the default set of write metrics.
+ *
+ * @param context {@link Context} for this task
+ * @param table underlying table
+ * @param tableId table Id
+ */
+ public TableWriteMetrics(Context context, Table table, String tableId) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+ numPuts = tableMetricsUtil.newCounter("num-puts");
+ numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+ numDeletes = tableMetricsUtil.newCounter("num-deletes");
+ numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+ numFlushes = tableMetricsUtil.newCounter("num-flushes");
+
+ MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+ if (metricsConfig.getMetricsTimerEnabled()) {
+ putNs = tableMetricsUtil.newTimer("put-ns");
+ putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+ deleteNs = tableMetricsUtil.newTimer("delete-ns");
+ deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+ flushNs = tableMetricsUtil.newTimer("flush-ns");
+ putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns");
+ deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns");
+ } else {
+ putNs = null;
+ putAllNs = null;
+ deleteNs = null;
+ deleteAllNs = null;
+ flushNs = null;
+ putCallbackNs = null;
+ deleteCallbackNs = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
index 2775ca7..bd3ef18 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Sets;
@@ -49,10 +50,30 @@ public class TestJavaTableConfig {
@Test
public void testGetTableProperties() {
Map<String, String> map = new HashMap<>();
- map.put("tables.t1.spec", "t1-spec");
+ map.put("stores.t1.key.serde", "key-serde");
+ map.put("stores.t1.msg.serde", "msg-serde");
map.put("tables.t1.provider.factory", "t1-provider-factory");
JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
assertEquals("t1-provider-factory", tableConfig.getTableProviderFactory("t1"));
+ assertEquals("key-serde", tableConfig.getKeySerde("t1"));
+ assertEquals("msg-serde", tableConfig.getMsgSerde("t1"));
}
+ @Test
+ public void testBuildKey() {
+ String key = JavaTableConfig.buildKey("t1", "abc");
+ Assert.assertEquals("tables.t1.abc", key);
+ }
+
+ @Test
+ public void testGetForTable() {
+ Map<String, String> map = new HashMap<>();
+ map.put(JavaTableConfig.buildKey("t1", "abc"), "xyz");
+ JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
+ Assert.assertEquals("xyz", tableConfig.getForTable("t1", "abc"));
+ Assert.assertNull(tableConfig.getForTable("t1", "aaa"));
+ Assert.assertEquals("xyz", tableConfig.getForTable("t1", "aaa", "xyz"));
+ Assert.assertNull(tableConfig.getForTable("tt", "abc"));
+ Assert.assertEquals("xyz", tableConfig.getForTable("tt", "abc", "xyz"));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index bfb329a..daaba46 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -24,6 +24,7 @@ import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
import org.apache.samza.metrics.Counter;
@@ -59,14 +60,13 @@ import java.util.concurrent.Executors;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+
public class TestCachingTable {
+
+ private MetricsRegistry metricsRegistry;
+
@Test
public void testSerializeSimple() {
doTestSerialize(null);
@@ -139,15 +139,23 @@ public class TestCachingTable {
}
private void initTables(ReadableTable ... tables) {
+ initTables(false, tables);
+ }
+
+ private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerMetricsDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
Context context = new MockContext();
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doReturn(new MapConfig(config)).when(context.getJobContext()).getConfig();
+ metricsRegistry = mock(MetricsRegistry.class);
doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
- when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(metricsRegistry);
- for (ReadableTable table : tables) {
- table.init(context);
- }
+ doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+
+ Arrays.asList(tables).forEach(t -> t.init(context));
}
private void doTestCacheOps(boolean isWriteAround) {
@@ -183,7 +191,7 @@ public class TestCachingTable {
return null;
}).when(context.getTaskContext()).getTable(anyString());
- when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+ when(context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
Map<String, String> tableConfig = desc.toConfig(new MapConfig());
when(context.getJobContext().getConfig()).thenReturn(new MapConfig(tableConfig));
@@ -284,6 +292,20 @@ public class TestCachingTable {
initTables(cachingTable, guavaTable, remoteTable);
+ // 3 per readable table (9)
+ // 5 per read/write table (15)
+ verify(metricsRegistry, times(24)).newCounter(any(), anyString());
+
+ // 3 per readable table (9)
+ // 7 per read/write table (21)
+ // 1 per remote readable table (1)
+ // 1 per remote read/write table (1)
+ verify(metricsRegistry, times(32)).newTimer(any(), anyString());
+
+ // 1 per guava table (1)
+ // 3 per caching table (2)
+ verify(metricsRegistry, times(4)).newGauge(anyString(), any());
+
// GET
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any());
Assert.assertEquals(cachingTable.getAsync("foo").get(), "bar");
@@ -364,6 +386,50 @@ public class TestCachingTable {
Assert.assertNull(guavaCache.getIfPresent("foo3"));
}
+ @Test
+ public void testTimerDisabled() throws Exception {
+ String tableId = "testTimerDisabled";
+
+ Cache<String, String> guavaCache = CacheBuilder.newBuilder().initialCapacity(100).build();
+ final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId, guavaCache);
+
+ TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
+
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ doReturn(CompletableFuture.completedFuture("")).when(readFn).getAsync(any());
+
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
+ doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
+
+ final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+ tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
+ Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
+
+ final CachingTable<String, String> cachingTable = new CachingTable<>(
+ tableId, remoteTable, guavaTable, false);
+
+ initTables(true, cachingTable, guavaTable, remoteTable);
+
+ cachingTable.get("");
+ cachingTable.getAsync("").get();
+ cachingTable.getAll(Collections.emptyList());
+ cachingTable.getAllAsync(Collections.emptyList());
+ cachingTable.flush();
+ cachingTable.put("", "");
+ cachingTable.putAsync("", "");
+ cachingTable.putAll(Collections.emptyList());
+ cachingTable.putAllAsync(Collections.emptyList());
+ cachingTable.delete("");
+ cachingTable.deleteAsync("");
+ cachingTable.deleteAll(Collections.emptyList());
+ cachingTable.deleteAllAsync(Collections.emptyList());
+
+ verify(metricsRegistry, atLeast(1)).newCounter(any(), anyString());
+ verify(metricsRegistry, atLeast(1)).newGauge(anyString(), any());
+ verify(metricsRegistry, times(0)).newTimer(any(), anyString());
+ }
+
private TableDescriptor createDummyTableDescriptor(String tableId) {
BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class);
when(tableDescriptor.getTableId()).thenReturn(tableId);
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
index d1369d0..d7733a8 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
@@ -62,7 +62,7 @@ public class TestRemoteReadWriteTable {
doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
- doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+ doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
return context;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 69e95d4..7a75c90 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -132,13 +132,16 @@ public class TestRemoteTableDescriptor {
private Context createMockContext(TableDescriptor tableDescriptor) {
Context context = mock(Context.class);
- TaskContextImpl taskContext = mock(TaskContextImpl.class);
- when(context.getTaskContext()).thenReturn(taskContext);
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
when(metricsRegistry.newTimer(anyString(), anyString())).thenReturn(mock(Timer.class));
when(metricsRegistry.newCounter(anyString(), anyString())).thenReturn(mock(Counter.class));
- when(taskContext.getTaskMetricsRegistry()).thenReturn(metricsRegistry);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ TaskContextImpl taskContext = mock(TaskContextImpl.class);
+ when(context.getTaskContext()).thenReturn(taskContext);
TaskName taskName = new TaskName("MyTask");
TaskModel taskModel = mock(TaskModel.class);
@@ -147,10 +150,7 @@ public class TestRemoteTableDescriptor {
ContainerModel containerModel = mock(ContainerModel.class);
when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
-
- ContainerContext containerContext = mock(ContainerContext.class);
when(containerContext.getContainerModel()).thenReturn(containerModel);
- when(context.getContainerContext()).thenReturn(containerContext);
String containerId = "container-1";
JobModel jobModel = mock(JobModel.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
index e0107f9..98d3768 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
@@ -20,9 +20,12 @@ package org.apache.samza.storage.kv;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.DefaultTableWriteMetrics;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
/**
@@ -34,8 +37,6 @@ import org.apache.samza.table.utils.DefaultTableWriteMetrics;
public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
implements ReadWriteTable<K, V> {
- protected DefaultTableWriteMetrics writeMetrics;
-
/**
* Constructs an instance of {@link LocalReadWriteTable}
* @param tableId the table Id
@@ -46,18 +47,9 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
}
@Override
- public void init(Context context) {
- super.init(context);
- writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
- }
-
- @Override
public void put(K key, V value) {
if (value != null) {
- writeMetrics.numPuts.inc();
- long startNs = System.nanoTime();
- kvStore.put(key, value);
- writeMetrics.putNs.update(System.nanoTime() - startNs);
+ instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
} else {
delete(key);
}
@@ -77,10 +69,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
@Override
public void putAll(List<Entry<K, V>> entries) {
- writeMetrics.numPutAlls.inc();
- long startNs = System.nanoTime();
- kvStore.putAll(entries);
- writeMetrics.putAllNs.update(System.nanoTime() - startNs);
+ instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(entries));
}
@Override
@@ -97,10 +86,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
@Override
public void delete(K key) {
- writeMetrics.numDeletes.inc();
- long startNs = System.nanoTime();
- kvStore.delete(key);
- writeMetrics.deleteNs.update(System.nanoTime() - startNs);
+ instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
}
@Override
@@ -117,10 +103,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
@Override
public void deleteAll(List<K> keys) {
- writeMetrics.numDeleteAlls.inc();
- long startNs = System.nanoTime();
- kvStore.deleteAll(keys);
- writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
+ instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
}
@Override
@@ -137,10 +120,18 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
@Override
public void flush() {
- writeMetrics.numFlushes.inc();
+ instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
+ }
+
+ private interface Func0 {
+ void apply();
+ }
+
+ private void instrument(Counter counter, Timer timer, Func0 func) {
+ incCounter(counter);
long startNs = System.nanoTime();
- kvStore.flush();
- writeMetrics.flushNs.update(System.nanoTime() - startNs);
+ func.apply();
+ updateTimer(timer, System.nanoTime() - startNs);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
index f314918..ba0d3cf 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
@@ -19,14 +19,19 @@
package org.apache.samza.storage.kv;
import com.google.common.base.Preconditions;
+
+import com.google.common.base.Supplier;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.utils.DefaultTableReadMetrics;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.BaseReadableTable;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
/**
* A store backed readable table
@@ -34,12 +39,9 @@ import org.apache.samza.table.utils.DefaultTableReadMetrics;
* @param <K> the type of the key in this table
* @param <V> the type of the value in this table
*/
-public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
+public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
protected final KeyValueStore<K, V> kvStore;
- protected final String tableId;
-
- protected DefaultTableReadMetrics readMetrics;
/**
* Constructs an instance of {@link LocalReadableTable}
@@ -47,25 +49,16 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
* @param kvStore the backing store
*/
public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
- Preconditions.checkArgument(tableId != null & !tableId.isEmpty() , "invalid tableId");
+ super(tableId);
Preconditions.checkNotNull(kvStore, "null KeyValueStore");
- this.tableId = tableId;
this.kvStore = kvStore;
}
@Override
- public void init(Context context) {
- readMetrics = new DefaultTableReadMetrics(context, this, tableId);
- }
-
- @Override
public V get(K key) {
- readMetrics.numGets.inc();
- long startNs = System.nanoTime();
- V result = kvStore.get(key);
- readMetrics.getNs.update(System.nanoTime() - startNs);
+ V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
if (result == null) {
- readMetrics.numMissedLookups.inc();
+ incCounter(readMetrics.numMissedLookups);
}
return result;
}
@@ -83,11 +76,8 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
@Override
public Map<K, V> getAll(List<K> keys) {
- readMetrics.numGetAlls.inc();
- long startNs = System.nanoTime();
- Map<K, V> result = kvStore.getAll(keys);
- readMetrics.getAllNs.update(System.nanoTime() - startNs);
- result.values().stream().filter(Objects::isNull).map(v -> readMetrics.numMissedLookups.inc());
+ Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
+ result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
return result;
}
@@ -107,4 +97,12 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> {
// The KV store is not closed here as it may still be needed by downstream operators,
// it will be closed by the SamzaContainer
}
+
+ private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+ incCounter(counter);
+ long startNs = System.nanoTime();
+ T result = func.get();
+ updateTimer(timer, System.nanoTime() - startNs);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
new file mode 100644
index 0000000..5531951
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Collections;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadWriteTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalReadWriteTable {
+
+ public static final String TABLE_ID = "t1";
+
+ private Timer putNs;
+ private Timer putAllNs;
+ private Timer deleteNs;
+ private Timer deleteAllNs;
+ private Timer flushNs;
+ private Counter numPuts;
+ private Counter numPutAlls;
+ private Counter numDeletes;
+ private Counter numDeleteAlls;
+ private Counter numFlushes;
+ private Timer putCallbackNs;
+ private Timer deleteCallbackNs;
+
+ private MetricsRegistry metricsRegistry;
+
+ private KeyValueStore kvStore;
+
+ @Before
+ public void setUp() {
+
+ putNs = new Timer("");
+ putAllNs = new Timer("");
+ deleteNs = new Timer("");
+ deleteAllNs = new Timer("");
+ flushNs = new Timer("");
+ numPuts = new Counter("");
+ numPutAlls = new Counter("");
+ numDeletes = new Counter("");
+ numDeleteAlls = new Counter("");
+ numFlushes = new Counter("");
+ putCallbackNs = new Timer("");
+ deleteCallbackNs = new Timer("");
+
+ metricsRegistry = mock(MetricsRegistry.class);
+ String groupName = LocalReadWriteTable.class.getSimpleName();
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
+
+ kvStore = mock(KeyValueStore.class);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.put("k1", "v1");
+ table.putAsync("k2", "v2").get();
+ verify(kvStore, times(2)).put(any(), any());
+ Assert.assertEquals(2, numPuts.getCount());
+ Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testPutAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.putAll(Collections.emptyList());
+ table.putAllAsync(Collections.emptyList()).get();
+ verify(kvStore, times(2)).putAll(any());
+ Assert.assertEquals(2, numPutAlls.getCount());
+ Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.delete("");
+ table.deleteAsync("").get();
+ verify(kvStore, times(2)).delete(any());
+ Assert.assertEquals(2, numDeletes.getCount());
+ Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testDeleteAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.deleteAll(Collections.emptyList());
+ table.deleteAllAsync(Collections.emptyList()).get();
+ verify(kvStore, times(2)).deleteAll(any());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testFlush() {
+ ReadWriteTable table = createTable(false);
+ table.flush();
+ table.flush();
+ verify(kvStore, times(2)).flush();
+ Assert.assertEquals(2, numFlushes.getCount());
+ Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testTimerDisabled() throws Exception {
+ ReadWriteTable table = createTable(true);
+ table.put("", "");
+ table.putAsync("", "").get();
+ table.putAll(Collections.emptyList());
+ table.putAllAsync(Collections.emptyList()).get();
+ table.delete("");
+ table.deleteAsync("").get();
+ table.deleteAll(Collections.emptyList());
+ table.deleteAllAsync(Collections.emptyList()).get();
+ table.flush();
+ verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
+ verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
+ verify(metricsRegistry, times(0)).newGauge(anyString(), any());
+ Assert.assertEquals(1, numFlushes.getCount());
+ Assert.assertEquals(2, numPuts.getCount());
+ Assert.assertEquals(2, numPutAlls.getCount());
+ Assert.assertEquals(2, numDeletes.getCount());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ private LocalReadWriteTable createTable(boolean isTimerDisabled) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ LocalReadWriteTable table = new LocalReadWriteTable("t1", kvStore);
+ table.init(context);
+
+ return table;
+ }
+}