You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/23 20:20:50 UTC

samza git commit: SAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.

Repository: samza
Updated Branches:
  refs/heads/master 8b7417bca -> 38559796e


SAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.

In general, jobModel configuration contains service access tokens and certs. It's a common practice to run zookeeper in a non-ACL environment. Hence for security purposes, it's essential not to store configuration as a part of JobModel in zookeeper.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #479 from shanthoosh/nuke_configuration_stored_in_JobModel and squashes the following commits:

b8d2196 [Shanthoosh Venkataraman] Address review comments.
7876a44 [Shanthoosh Venkataraman] Nuke JobModel configuration in ZkJobCoordinator.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/38559796
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/38559796
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/38559796

Branch: refs/heads/master
Commit: 38559796edf28fd81d9d59078d312aa06d7d075d
Parents: 8b7417b
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon Apr 23 13:20:45 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Apr 23 13:20:45 2018 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 9 ++++++---
 .../samza/test/processor/TestZkLocalApplicationRunner.java  | 6 ++++++
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/38559796/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 2b4bc8b..1134d6f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
@@ -239,13 +240,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     // Create checkpoint and changelog streams if they don't exist
     if (!hasCreatedStreams) {
-      CheckpointManager checkpointManager = new TaskConfigJava(jobModel.getConfig()).getCheckpointManager(metrics.getMetricsRegistry());
+      CheckpointManager checkpointManager = new TaskConfigJava(config).getCheckpointManager(metrics.getMetricsRegistry());
       if (checkpointManager != null) {
         checkpointManager.createResources();
       }
 
       // Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams.
-      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions);
+      ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
       hasCreatedStreams = true;
     }
 
@@ -348,7 +349,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
      * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
      * to host mapping) is passed in as null when building the jobModel.
      */
-    return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
+    JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
+    // Nuke the configuration in JobModel.
+    return new JobModel(new MapConfig(), model.getContainers());
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {

http://git-wip-us.apache.org/repos/asf/samza/blob/38559796/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 97fe1f8..417398d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -280,6 +280,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion));
     // Job model before and after the addition of second stream processor should be the same.
     assertEquals(previousJobModel[0], updatedJobModel);
+
+    assertEquals(new MapConfig(), updatedJobModel.getConfig());
     // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
     // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
     assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
@@ -361,7 +363,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Task names in the job model should be different but the set of partitions should be the same and each task name
     // should be assigned to a different container.
+    assertEquals(new MapConfig(), previousJobModel[0].getConfig());
     assertEquals(previousJobModel[0].getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1);
+
+    assertEquals(new MapConfig(), updatedJobModel.getConfig());
     assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1);
     assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks().size(), 1);
     Map<TaskName, TaskModel> updatedTaskModelMap1 = updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks();
@@ -406,6 +411,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // Verifications before killing the leader.
     String jobModelVersion = zkUtils.getJobModelVersion();
     JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+    assertEquals(new MapConfig(), jobModel.getConfig());
     assertEquals(3, jobModel.getContainers().size());
     assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet());
     assertEquals("1", jobModelVersion);