You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/23 20:20:50 UTC
samza git commit: SAMZA-1667: Skip storing configuration as a part of
JobModel in zookeeper data nodes.
Repository: samza
Updated Branches:
refs/heads/master 8b7417bca -> 38559796e
SAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.
In general, jobModel configuration contains service access tokens and certs. It's a common practice to run zookeeper in a non-ACL environment. Hence for security purposes, it's essential not to store configuration as a part of JobModel in zookeeper.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #479 from shanthoosh/nuke_configuration_stored_in_JobModel and squashes the following commits:
b8d2196 [Shanthoosh Venkataraman] Address review comments.
7876a44 [Shanthoosh Venkataraman] Nuke JobModel configuration in ZkJobCoordinator.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/38559796
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/38559796
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/38559796
Branch: refs/heads/master
Commit: 38559796edf28fd81d9d59078d312aa06d7d075d
Parents: 8b7417b
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon Apr 23 13:20:45 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Apr 23 13:20:45 2018 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 9 ++++++---
.../samza/test/processor/TestZkLocalApplicationRunner.java | 6 ++++++
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/38559796/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 2b4bc8b..1134d6f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
@@ -239,13 +240,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// Create checkpoint and changelog streams if they don't exist
if (!hasCreatedStreams) {
- CheckpointManager checkpointManager = new TaskConfigJava(jobModel.getConfig()).getCheckpointManager(metrics.getMetricsRegistry());
+ CheckpointManager checkpointManager = new TaskConfigJava(config).getCheckpointManager(metrics.getMetricsRegistry());
if (checkpointManager != null) {
checkpointManager.createResources();
}
// Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams.
- ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions);
+ ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
hasCreatedStreams = true;
}
@@ -348,7 +349,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
* Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
* to host mapping) is passed in as null when building the jobModel.
*/
- return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
+ JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
+ // Nuke the configuration in JobModel.
+ return new JobModel(new MapConfig(), model.getContainers());
}
class LeaderElectorListenerImpl implements LeaderElectorListener {
http://git-wip-us.apache.org/repos/asf/samza/blob/38559796/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 97fe1f8..417398d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -280,6 +280,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion));
// Job model before and after the addition of second stream processor should be the same.
assertEquals(previousJobModel[0], updatedJobModel);
+
+ assertEquals(new MapConfig(), updatedJobModel.getConfig());
// TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
// ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
@@ -361,7 +363,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Task names in the job model should be different but the set of partitions should be the same and each task name
// should be assigned to a different container.
+ assertEquals(new MapConfig(), previousJobModel[0].getConfig());
assertEquals(previousJobModel[0].getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1);
+
+ assertEquals(new MapConfig(), updatedJobModel.getConfig());
assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1);
assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks().size(), 1);
Map<TaskName, TaskModel> updatedTaskModelMap1 = updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks();
@@ -406,6 +411,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Verifications before killing the leader.
String jobModelVersion = zkUtils.getJobModelVersion();
JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+ assertEquals(new MapConfig(), jobModel.getConfig());
assertEquals(3, jobModel.getContainers().size());
assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet());
assertEquals("1", jobModelVersion);