You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/11/15 18:27:07 UTC
samza git commit: SAMZA-1501: Validate operator IDs so that they
don't contain special characters and spaces
Repository: samza
Updated Branches:
refs/heads/master 6fcf7f3f4 -> 4ac62231c
SAMZA-1501: Validate operator IDs so that they don't contain special characters and spaces
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jacob Maes <jm...@apache.org>, Jagadish Venkatraman <vj...@gmail.com>
Closes #359 from prateekm/operator-id-validation
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ac62231
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ac62231
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ac62231
Branch: refs/heads/master
Commit: 4ac62231c5bd8379a6bd8694140fada92e76a77c
Parents: 6fcf7f3
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Nov 15 10:27:08 2017 -0800
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Nov 15 10:27:08 2017 -0800
----------------------------------------------------------------------
.../apache/samza/operators/StreamGraphImpl.java | 7 ++++
.../samza/operators/TestStreamGraphImpl.java | 38 ++++++++++++++++++++
2 files changed, 45 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4ac62231/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 936cb3a..5323c32 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -43,6 +43,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -51,6 +52,7 @@ import java.util.stream.Collectors;
*/
public class StreamGraphImpl implements StreamGraph {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
+ private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+");
// We use a LHM for deterministic order in initializing and closing operators.
private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
@@ -58,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph {
private final ApplicationRunner runner;
private final Config config;
+
/**
* The 0-based position of the next operator in the graph.
* Part of the unique ID for each OperatorSpec in the graph.
@@ -202,6 +205,10 @@ public class StreamGraphImpl implements StreamGraph {
* @return the unique ID for the next operator in the graph
*/
/* package private */ String getNextOpId(OpCode opCode, String userDefinedId) {
+ if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
+ throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId);
+ }
+
String nextOpId = String.format("%s-%s-%s-%s",
config.get(JobConfig.JOB_NAME()),
config.get(JobConfig.JOB_ID(), "1"),
http://git-wip-us.apache.org/repos/asf/samza/blob/4ac62231/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index e0152a0..cf0a198 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators;
+import com.google.common.collect.ImmutableList;
import junit.framework.Assert;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -520,6 +521,43 @@ public class TestStreamGraphImpl {
}
@Test
+ public void testUserDefinedIdValidation() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ // null and empty userDefinedIDs should fall back to autogenerated IDs.
+ try {
+ graph.getNextOpId(OpCode.FILTER, null);
+ graph.getNextOpId(OpCode.FILTER, "");
+ graph.getNextOpId(OpCode.FILTER, " ");
+ graph.getNextOpId(OpCode.FILTER, "\t");
+ } catch (SamzaException e) {
+ Assert.fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
+ }
+
+ List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", "1000", "op_1", "OP_ID");
+ for (String validOpId: validOpIds) {
+ try {
+ graph.getNextOpId(OpCode.FILTER, validOpId);
+ } catch (Exception e) {
+ Assert.fail("Received an exception with a valid operator ID: " + validOpId);
+ }
+ }
+
+ List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
+ for (String invalidOpId: invalidOpIds) {
+ try {
+ graph.getNextOpId(OpCode.FILTER, invalidOpId);
+ Assert.fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
+ } catch (SamzaException e) { }
+ }
+ }
+
+ @Test
public void testGetInputStreamPreservesInsertionOrder() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);