You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/11/15 18:27:07 UTC

samza git commit: SAMZA-1501: Validate operator IDs so that they don't contain special characters and spaces

Repository: samza
Updated Branches:
  refs/heads/master 6fcf7f3f4 -> 4ac62231c


SAMZA-1501: Validate operator IDs so that they don't contain special characters and spaces

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jacob Maes <jm...@apache.org>, Jagadish Venkatraman <vj...@gmail.com>

Closes #359 from prateekm/operator-id-validation


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ac62231
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ac62231
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ac62231

Branch: refs/heads/master
Commit: 4ac62231c5bd8379a6bd8694140fada92e76a77c
Parents: 6fcf7f3
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Nov 15 10:27:08 2017 -0800
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Nov 15 10:27:08 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/operators/StreamGraphImpl.java |  7 ++++
 .../samza/operators/TestStreamGraphImpl.java    | 38 ++++++++++++++++++++
 2 files changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4ac62231/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 936cb3a..5323c32 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -43,6 +43,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
@@ -51,6 +52,7 @@ import java.util.stream.Collectors;
  */
 public class StreamGraphImpl implements StreamGraph {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
+  private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+");
 
   // We use a LHM for deterministic order in initializing and closing operators.
   private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
@@ -58,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph {
   private final ApplicationRunner runner;
   private final Config config;
 
+
   /**
    * The 0-based position of the next operator in the graph.
    * Part of the unique ID for each OperatorSpec in the graph.
@@ -202,6 +205,10 @@ public class StreamGraphImpl implements StreamGraph {
    * @return the unique ID for the next operator in the graph
    */
   /* package private */ String getNextOpId(OpCode opCode, String userDefinedId) {
+    if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
+      throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId);
+    }
+
     String nextOpId = String.format("%s-%s-%s-%s",
         config.get(JobConfig.JOB_NAME()),
         config.get(JobConfig.JOB_ID(), "1"),

http://git-wip-us.apache.org/repos/asf/samza/blob/4ac62231/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index e0152a0..cf0a198 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators;
 
+import com.google.common.collect.ImmutableList;
 import junit.framework.Assert;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -520,6 +521,43 @@ public class TestStreamGraphImpl {
   }
 
   @Test
+  public void testUserDefinedIdValidation() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    // null and empty userDefinedIDs should fall back to autogenerated IDs.
+    try {
+      graph.getNextOpId(OpCode.FILTER, null);
+      graph.getNextOpId(OpCode.FILTER, "");
+      graph.getNextOpId(OpCode.FILTER, " ");
+      graph.getNextOpId(OpCode.FILTER, "\t");
+    } catch (SamzaException e) {
+      Assert.fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
+    }
+
+    List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", "1000", "op_1", "OP_ID");
+    for (String validOpId: validOpIds) {
+      try {
+        graph.getNextOpId(OpCode.FILTER, validOpId);
+      } catch (Exception e) {
+        Assert.fail("Received an exception with a valid operator ID: " + validOpId);
+      }
+    }
+
+    List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
+    for (String invalidOpId: invalidOpIds) {
+      try {
+        graph.getNextOpId(OpCode.FILTER, invalidOpId);
+        Assert.fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
+      } catch (SamzaException e) { }
+    }
+  }
+
+  @Test
   public void testGetInputStreamPreservesInsertionOrder() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);