You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:48 UTC
[02/23] samza git commit: SAMZA-1324: Fix NullPointerException in
ZkUtils api's.
SAMZA-1324: Fix NullPointerException in ZkUtils api's.
Problem:
Read/Write api methods in ZkUtils updates counters/timers in `metrics` field. In a ZkUtils constructor this fields is not initialized properly. Java default for uninitialized field is null resulting in NPE.
Fix:
Initialize private fields of ZkUtils class with appropriate defaults.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #235 from shanthoosh/fix_zkutils_api
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebb1b7fe
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebb1b7fe
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebb1b7fe
Branch: refs/heads/0.14.0
Commit: ebb1b7fea2518e827e589fe8523089322c68c9bf
Parents: be98993
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Jul 12 11:13:09 2017 -0700
Committer: navina <na...@apache.org>
Committed: Wed Jul 12 11:13:09 2017 -0700
----------------------------------------------------------------------
.../versioned/container/metrics-table.html | 8 ++-
.../samza/zk/ZkCoordinationServiceFactory.java | 5 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 2 +-
.../samza/zk/ZkJobCoordinatorMetrics.java | 9 ----
.../main/java/org/apache/samza/zk/ZkUtils.java | 17 +++---
.../org/apache/samza/zk/ZkUtilsMetrics.java | 56 ++++++++++++++++++++
.../zk/TestZkBarrierForVersionUpgrade.java | 4 +-
.../apache/samza/zk/TestZkLeaderElector.java | 2 +-
.../apache/samza/zk/TestZkProcessorLatch.java | 4 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 4 +-
.../processor/TestZkLocalApplicationRunner.java | 5 +-
11 files changed, 82 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/docs/learn/documentation/versioned/container/metrics-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 7fbbc40..e504fa3 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -142,6 +142,7 @@
<li><a href="#bootstrapping-chooser-metrics">BootstrappingChooserMetrics</a></li>
<li><a href="#hdfs-system-producer-metrics">HdfsSystemProducerMetrics</a></li>
<li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li>
+ <li><a href="#zookeeper-client-metrics">ZookeeperClientMetrics</a></li>
<li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li>
</ul>
<p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p>
@@ -894,7 +895,7 @@
</tr>
<tr>
- <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th>
+ <th colspan="2" class="section" id="zookeeper-client-metrics">org.apache.samza.zk.ZkUtilsMetrics</th>
</tr>
<tr>
<td>reads</td>
@@ -909,10 +910,13 @@
<td>Number of subscriptions to znodes in Zookeeper</td>
</tr>
<tr>
- <td>zk-connection-error</td>
+ <td>zk-connection-errors</td>
<td>Number of Zookeeper connection errors</td>
</tr>
<tr>
+ <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th>
+ </tr>
+ <tr>
<td>is-leader</td>
<td>Denotes if the processor is a leader or not</td>
</tr>
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index 20fcfa4..d0633a8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -25,13 +25,14 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.zookeeper.client.ConnectStringParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
- private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
ZkConfig zkConfig = new ZkConfig(config);
@@ -39,7 +40,7 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory
ZkClient zkClient =
createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
- ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+ ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry());
return new ZkCoordinationUtils(participantId, zkConfig, zkUtils);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f2fc3de..94c3054 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -80,7 +80,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
zkConfig.getZkConnect(),
zkConfig.getZkSessionTimeoutMs(),
zkConfig.getZkConnectionTimeoutMs()),
- zkConfig.getZkConnectionTimeoutMs(), metrics);
+ zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
this.processorId = createProcessorId(config);
LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
index 3437602..3d00897 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
@@ -31,11 +31,6 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
private final MetricsRegistry metricsRegistry;
- public final Counter reads;
- public final Counter writes;
- public final Counter subscriptions;
- public final Counter zkConnectionError;
-
/**
* Denotes if the processor is a leader or not
*/
@@ -65,10 +60,6 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) {
super(metricsRegistry);
this.metricsRegistry = metricsRegistry;
- this.reads = newCounter("reads");
- this.writes = newCounter("writes");
- this.subscriptions = newCounter("subscriptions");
- this.zkConnectionError = newCounter("zk-connection-error");
this.isLeader = newGauge("is-leader", false);
this.barrierCreation = newCounter("barrier-creation");
this.barrierStateChange = newCounter("barrier-state-change");
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index aa55ff7..ecf118b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.samza.SamzaException;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
@@ -65,19 +66,13 @@ public class ZkUtils {
private volatile String ephemeralPath = null;
private final ZkKeyBuilder keyBuilder;
private final int connectionTimeoutMs;
- private ZkJobCoordinatorMetrics metrics;
+ private final ZkUtilsMetrics metrics;
- public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+ public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) {
this.keyBuilder = zkKeyBuilder;
this.connectionTimeoutMs = connectionTimeoutMs;
this.zkClient = zkClient;
- }
-
- public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, ZkJobCoordinatorMetrics metrics) {
- this.keyBuilder = zkKeyBuilder;
- this.connectionTimeoutMs = connectionTimeoutMs;
- this.zkClient = zkClient;
- this.metrics = metrics;
+ this.metrics = new ZkUtilsMetrics(metricsRegistry);
}
public void connect() throws ZkInterruptedException {
@@ -269,7 +264,9 @@ public class ZkUtils {
* @return jobmodel version as a string
*/
public String getJobModelVersion() {
- return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
+ String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath());
+ metrics.reads.inc();
+ return jobModelVersion;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
new file mode 100644
index 0000000..b9f4aa8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Contains all the metrics published by {@link ZkUtils}.
+ */
+public class ZkUtilsMetrics extends MetricsBase {
+ /**
+ * Number of data reads from zookeeper.
+ */
+ public final Counter reads;
+
+ /**
+ * Number of data writes into zookeeper.
+ */
+ public final Counter writes;
+
+ /**
+ * Number of subscriptions created with zookeeper.
+ */
+ public final Counter subscriptions;
+
+ /**
+ * Number of zookeeper connection errors in ZkClient.
+ */
+ public final Counter zkConnectionError;
+
+ public ZkUtilsMetrics(MetricsRegistry metricsRegistry) {
+ super(metricsRegistry);
+ this.reads = newCounter("reads");
+ this.writes = newCounter("writes");
+ this.subscriptions = newCounter("subscriptions");
+ this.zkConnectionError = newCounter("zk-connection-errors");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 49cd280..3dd1bd5 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -53,9 +53,9 @@ public class TestZkBarrierForVersionUpgrade {
@Before
public void testSetup() {
ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
- this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+ this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
- this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+ this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
}
@After
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 993297b..3ff9175 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -437,6 +437,6 @@ public class TestZkLeaderElector {
return new ZkUtils(
KEY_BUILDER,
zkClient,
- CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+ CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 9f089a0..b2a5533 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -219,6 +220,7 @@ public class TestZkProcessorLatch {
return new ZkUtils(
KEY_BUILDER,
zkClient,
- CONNECTION_TIMEOUT_MS);
+ CONNECTION_TIMEOUT_MS,
+ new NoOpMetricsRegistry());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 9e33484..a33bf03 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -71,7 +71,7 @@ public class TestZkUtils {
zkUtils = new ZkUtils(
KEY_BUILDER,
zkClient,
- SESSION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+ SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
zkUtils.connect();
}
@@ -110,7 +110,7 @@ public class TestZkUtils {
zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
List<String> l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(1, l.size());
- new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS).registerProcessorAndGetId(new ProcessorData("host2", "2"));
+ new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2"));
l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(2, l.size());
http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 4865647..2d5da2b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -54,7 +54,6 @@ import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.util.NoOpMetricsRegistry;
-import org.apache.samza.zk.ZkJobCoordinatorMetrics;
import org.apache.samza.zk.ZkKeyBuilder;
import org.apache.samza.zk.ZkUtils;
import org.junit.Rule;
@@ -94,7 +93,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private LocalApplicationRunner applicationRunner1;
private LocalApplicationRunner applicationRunner2;
private LocalApplicationRunner applicationRunner3;
- private ZkJobCoordinatorMetrics zkJobCoordinatorMetrics;
// Set 90 seconds as max execution time for each test.
@Rule
@@ -110,8 +108,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
ZkClient zkClient = new ZkClient(zkConnect());
ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId));
- zkJobCoordinatorMetrics = new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry());
- zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, zkJobCoordinatorMetrics);
+ zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
zkUtils.connect();
// Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId.