You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/02/06 01:20:08 UTC
[samza] branch master updated: SAMZA-2097: Allow table parts
configuration generation
This is an automated email from the ASF dual-hosted git repository.
weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4e5ffab SAMZA-2097: Allow table parts configuration generation
4e5ffab is described below
commit 4e5ffababfdc18ee41ba8af60f04a4d1755a7ad1
Author: Wei Song <ws...@linkedin.com>
AuthorDate: Tue Feb 5 17:19:56 2019 -0800
SAMZA-2097: Allow table parts configuration generation
Sometimes a table part may use some 3rd party libraries that require certain configuration, these can't be handled from within a table descriptor itself. We should allow table parts to participate in the configuration generation in toConfig() method.
Author: Wei Song <ws...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #906 from weisong44/SAMZA-2097
---
.../table/descriptors/RemoteTableDescriptor.java | 48 +++++++++--
.../org/apache/samza/table/remote/TablePart.java | 47 +++++++++++
.../samza/table/remote/TableRateLimiter.java | 2 +-
.../samza/table/remote/TableReadFunction.java | 2 +-
.../samza/table/remote/TableWriteFunction.java | 2 +-
.../apache/samza/table/retry/TableRetryPolicy.java | 4 +-
.../descriptors/TestRemoteTableDescriptor.java | 98 ++++++++++++++++++++--
7 files changed, 184 insertions(+), 19 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 4b15c47..f8effdc 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -26,6 +26,8 @@ import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.table.remote.TablePart;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
@@ -225,13 +227,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
- // Serialize and store reader/writer functions
- addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
-
- if (writeFn != null) {
- addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
- }
-
+ // Handle rate limiter
if (!tagCreditsMap.isEmpty()) {
RateLimiter defaultRateLimiter;
try {
@@ -243,29 +239,52 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
throw new SamzaException("Failed to create default rate limiter", ex);
}
addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
+ if (defaultRateLimiter instanceof TablePart) {
+ addTablePartConfig((TablePart) defaultRateLimiter, jobConfig, tableConfig);
+ }
} else if (rateLimiter != null) {
addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
+ if (rateLimiter instanceof TablePart) {
+ addTablePartConfig((TablePart) rateLimiter, jobConfig, tableConfig);
+ }
}
- // Serialize the readCredit functions
+ // Handle readCredit functions
if (readCreditFn != null) {
addTableConfig(READ_CREDIT_FN, SerdeUtils.serialize("read credit function", readCreditFn), tableConfig);
+ addTablePartConfig(readCreditFn, jobConfig, tableConfig);
}
- // Serialize the writeCredit functions
+
+ // Handle writeCredit functions
if (writeCreditFn != null) {
addTableConfig(WRITE_CREDIT_FN, SerdeUtils.serialize("write credit function", writeCreditFn), tableConfig);
+ addTablePartConfig(writeCreditFn, jobConfig, tableConfig);
}
+ // Handle read retry policy
if (readRetryPolicy != null) {
addTableConfig(READ_RETRY_POLICY, SerdeUtils.serialize("read retry policy", readRetryPolicy), tableConfig);
+ addTablePartConfig(readRetryPolicy, jobConfig, tableConfig);
}
+ // Handle write retry policy
if (writeRetryPolicy != null) {
addTableConfig(WRITE_RETRY_POLICY, SerdeUtils.serialize("write retry policy", writeRetryPolicy), tableConfig);
+ addTablePartConfig(writeRetryPolicy, jobConfig, tableConfig);
}
addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig);
+ // Handle table reader function
+ addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
+ addTablePartConfig(readFn, jobConfig, tableConfig);
+
+ // Handle table write function
+ if (writeFn != null) {
+ addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
+ addTablePartConfig(writeFn, jobConfig, tableConfig);
+ }
+
return Collections.unmodifiableMap(tableConfig);
}
@@ -278,4 +297,15 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
"too many threads for async callback executor.");
}
+
+ /**
+ * Helper method to add table part config items to table configuration
+ * @param tablePart table part
+ * @param jobConfig job configuration
+ * @param tableConfig table configuration
+ */
+ protected void addTablePartConfig(TablePart tablePart, Config jobConfig, Map<String, String> tableConfig) {
+ tableConfig.putAll(tablePart.toConfig(jobConfig, new MapConfig(tableConfig)));
+ }
+
}
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TablePart.java b/samza-api/src/main/java/org/apache/samza/table/remote/TablePart.java
new file mode 100644
index 0000000..d54f248
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TablePart.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * A building block of a remote table
+ */
+@InterfaceStability.Unstable
+public interface TablePart {
+
+ /**
+ * Generate configuration for this building block. There are situations where this object
+ * or its external dependencies may require certain configuration, this method allows
+ * generation and inclusion of them in the job configuration.
+ *
+ * @param jobConfig job configuration
+ * @param tableConfig so far generated configuration for this table
+ * @return configuration for this build block
+ */
+ default Map<String, String> toConfig(Config jobConfig, Config tableConfig) {
+ return Collections.emptyMap();
+ }
+
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
index 0758dd2..f52bcbe 100644
--- a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
@@ -58,7 +58,7 @@ public class TableRateLimiter<K, V> {
* @param <V> the type of the value
*/
@InterfaceStability.Unstable
- public interface CreditFunction<K, V> extends Serializable {
+ public interface CreditFunction<K, V> extends TablePart, Serializable {
/**
* Get the number of credits required for the {@code key} and {@code value} pair.
* @param key table key
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
index d54f83d..04fc918 100644
--- a/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -46,7 +46,7 @@ import com.google.common.collect.Iterables;
* @param <V> the type of the value in this table
*/
@InterfaceStability.Unstable
-public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+public interface TableReadFunction<K, V> extends TablePart, InitableFunction, ClosableFunction, Serializable {
/**
* Fetch single table record for a specified {@code key}. This method must be thread-safe.
* The default implementation calls getAsync and blocks on the completion afterwards.
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
index 1e3dc4c..3b06664 100644
--- a/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Iterables;
* @param <V> the type of the value in this table
*/
@InterfaceStability.Unstable
-public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+public interface TableWriteFunction<K, V> extends TablePart, InitableFunction, ClosableFunction, Serializable {
/**
* Store single table {@code record} with specified {@code key}. This method must be thread-safe.
* The default implementation calls putAsync and blocks on the completion afterwards.
diff --git a/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
index 162eb07..078bce6 100644
--- a/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
+++ b/samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
@@ -24,6 +24,7 @@ import java.time.Duration;
import java.util.function.Predicate;
import com.google.common.base.Preconditions;
+import org.apache.samza.table.remote.TablePart;
/**
@@ -35,7 +36,8 @@ import com.google.common.base.Preconditions;
*
* Retry libraries can implement a subset or all features as described by this common policy.
*/
-public class TableRetryPolicy implements Serializable {
+public class TableRetryPolicy implements TablePart, Serializable {
+
enum BackoffType {
/**
* No backoff in between two retry attempts.
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 4703752..ab87b89 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -20,6 +20,8 @@
package org.apache.samza.table.remote.descriptors;
import com.google.common.collect.ImmutableMap;
+
+import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
@@ -41,6 +43,7 @@ import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
import org.apache.samza.table.remote.AsyncRemoteTable;
import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.table.remote.RemoteTableProvider;
+import org.apache.samza.table.remote.TablePart;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
@@ -57,12 +60,21 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
-import static org.mockito.Mockito.*;
+import static org.apache.samza.table.remote.TableRateLimiter.CreditFunction;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
public class TestRemoteTableDescriptor {
- private void doTestSerialize(RateLimiter rateLimiter, TableRateLimiter.CreditFunction readCredFn,
- TableRateLimiter.CreditFunction writeCredFn) {
+
+ private void doTestSerialize(RateLimiter rateLimiter, CreditFunction readCredFn, CreditFunction writeCredFn) {
String tableId = "1";
RemoteTableDescriptor desc = new RemoteTableDescriptor(tableId)
.withReadFunction(createMockTableReadFunction())
@@ -132,6 +144,65 @@ public class TestRemoteTableDescriptor {
desc.toConfig(new MapConfig());
}
+ @Test
+ public void testTablePartToConfigDefault() {
+ TableReadFunction readFn = createMockTableReadFunction();
+ when(readFn.toConfig(any(), any())).thenReturn(createConfigPair(1));
+ Map<String, String> tableConfig = new RemoteTableDescriptor("1")
+ .withReadFunction(readFn)
+ .withReadRateLimit(100)
+ .toConfig(new MapConfig());
+ verify(readFn, times(1)).toConfig(any(), any());
+ Assert.assertEquals("v1", tableConfig.get("k1"));
+ }
+
+ @Test
+ public void testTablePartToConfig() {
+
+ int keys = 0;
+
+ TableReadFunction readFn = createMockTableReadFunction();
+ when(readFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ TableWriteFunction writeFn = createMockTableWriteFunction();
+ when(writeFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ RateLimiter rateLimiter = createMockRateLimiter();
+ when(((TablePart) rateLimiter).toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ CreditFunction readCredFn = createMockCreditFunction();
+ when(readCredFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ CreditFunction writeCredFn = createMockCreditFunction();
+ when(writeCredFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ TableRetryPolicy readRetryPolicy = createMockTableRetryPolicy();
+ when(readRetryPolicy.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ TableRetryPolicy writeRetryPolicy = createMockTableRetryPolicy();
+ when(writeRetryPolicy.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+ Map<String, String> tableConfig = new RemoteTableDescriptor("1")
+ .withReadFunction(readFn)
+ .withWriteFunction(writeFn)
+ .withRateLimiter(rateLimiter, readCredFn, writeCredFn)
+ .withReadRetryPolicy(readRetryPolicy)
+ .withWriteRetryPolicy(writeRetryPolicy)
+ .toConfig(new MapConfig());
+
+ verify(readFn, times(1)).toConfig(any(), any());
+ verify(writeFn, times(1)).toConfig(any(), any());
+ verify((TablePart) rateLimiter, times(1)).toConfig(any(), any());
+ verify(readCredFn, times(1)).toConfig(any(), any());
+ verify(writeCredFn, times(1)).toConfig(any(), any());
+ verify(readRetryPolicy, times(1)).toConfig(any(), any());
+ verify(writeRetryPolicy, times(1)).toConfig(any(), any());
+
+ for (int n = 0; n < keys; n++) {
+ Assert.assertEquals("v" + n, tableConfig.get("k" + n));
+ }
+ }
+
private Context createMockContext(TableDescriptor tableDescriptor) {
Context context = mock(Context.class);
@@ -161,13 +232,14 @@ public class TestRemoteTableDescriptor {
when(jobModel.getContainers()).thenReturn(ImmutableMap.of(containerId, containerModel));
JobContext jobContext = mock(JobContext.class);
- when(jobContext.getConfig()).thenReturn(new MapConfig(tableDescriptor.toConfig(new MapConfig())));
+ Config jobConfig = new MapConfig(tableDescriptor.toConfig(new MapConfig()));
+ when(jobContext.getConfig()).thenReturn(jobConfig);
when(context.getJobContext()).thenReturn(jobContext);
return context;
}
- static class CountingCreditFunction<K, V> implements TableRateLimiter.CreditFunction<K, V> {
+ static class CountingCreditFunction<K, V> implements CreditFunction<K, V> {
int numCalls = 0;
@Override
public int getCredits(K key, V value) {
@@ -270,7 +342,11 @@ public class TestRemoteTableDescriptor {
}
private RateLimiter createMockRateLimiter() {
- return mock(RateLimiter.class, withSettings().serializable());
+ return mock(RateLimiter.class, withSettings().serializable().extraInterfaces(TablePart.class));
+ }
+
+ private CreditFunction createMockCreditFunction() {
+ return mock(CreditFunction.class, withSettings().serializable());
}
private TableReadFunction createMockTableReadFunction() {
@@ -281,6 +357,16 @@ public class TestRemoteTableDescriptor {
return mock(TableWriteFunction.class, withSettings().serializable());
}
+ private TableRetryPolicy createMockTableRetryPolicy() {
+ return mock(TableRetryPolicy.class, withSettings().serializable());
+ }
+
+ private Map<String, String> createConfigPair(int n) {
+ Map<String, String> config = new HashMap<>();
+ config.put("k" + n, "v" + n);
+ return config;
+ }
+
private void assertExists(String key, String tableId, Map<String, String> config) {
String realKey = JavaTableConfig.buildKey(tableId, key);
Assert.assertTrue(config.containsKey(realKey));