You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/04 01:49:42 UTC

[1/7] helix git commit: Add protective check for ZooKeeper writing data that is bigger than 1MB

Repository: helix
Updated Branches:
  refs/heads/master 6775cd3ff -> d57882b9b


Add protective check for ZooKeeper writing data that is bigger than 1MB

ZooKeeper server drops connections for requests that are trying to write data bigger than 1 MB, without returning any error code. When a Helix user does so, the request times out without giving a reason.

ZkClient in Helix is a wrapper for ZkClient in ZooKeeper. Add check in the Helix ZkClient wrapper to give user exact timeout reason when data is bigger than 1MB.

Add unit test.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9f554e6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9f554e6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9f554e6

Branch: refs/heads/master
Commit: f9f554e68bfbffdfd8f87db76d546f7202f1541b
Parents: 6775cd3
Author: Weihan Kong <wk...@linkedin.com>
Authored: Mon Jan 23 17:18:00 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 14:49:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/helix/manager/zk/ZkClient.java   |  24 +++-
 .../org/apache/helix/TestZkClientWrapper.java   | 116 ------------------
 .../apache/helix/manager/zk/TestZkClient.java   | 122 +++++++++++++++++++
 3 files changed, 140 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 0a61e82..8f11eb3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -30,6 +30,8 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
@@ -287,7 +289,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
     long startT = System.nanoTime();
     try {
       final byte[] data = serialize(datat, path);
-
+      checkDataSizeLimit(data);
       retryUntilConnected(new Callable<Object>() {
 
         @Override
@@ -308,12 +310,13 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
       throws InterruptedException {
     long start = System.nanoTime();
     try {
-      final byte[] bytes = _zkSerializer.serialize(datat, path);
+      final byte[] data = _zkSerializer.serialize(datat, path);
+      checkDataSizeLimit(data);
       return retryUntilConnected(new Callable<Stat>() {
 
         @Override
         public Stat call() throws Exception {
-          return ((ZkConnection) _connection).getZookeeper().setData(path, bytes, expectedVersion);
+          return ((ZkConnection) _connection).getZookeeper().setData(path, data, expectedVersion);
         }
       });
     } finally {
@@ -325,7 +328,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   }
 
   @Override
-  public String create(final String path, Object data, final CreateMode mode)
+  public String create(final String path, Object datat, final CreateMode mode)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
     if (path == null) {
       throw new NullPointerException("path must not be null.");
@@ -333,13 +336,14 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
     long startT = System.nanoTime();
     try {
-      final byte[] bytes = data == null ? null : serialize(data, path);
+      final byte[] data = datat == null ? null : serialize(datat, path);
+      checkDataSizeLimit(data);
 
       return retryUntilConnected(new Callable<String>() {
 
         @Override
         public String call() throws Exception {
-          return _connection.create(path, bytes, mode);
+          return _connection.create(path, data, mode);
         }
       });
     } finally {
@@ -451,4 +455,12 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
       }
     });
   }
+
+  private void checkDataSizeLimit(byte[] data) {
+    if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
+      LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): "
+          + new String(data).substring(0, 1024));
+      throw new HelixException("Data size larger than 1M");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
deleted file mode 100644
index bc3d266..0000000
--- a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestZkClientWrapper extends ZkUnitTestBase {
-  private static Logger LOG = Logger.getLogger(TestZkClientWrapper.class);
-
-  ZkClient _zkClient;
-
-  @BeforeClass
-  public void beforeClass() {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    _zkClient.close();
-  }
-
-  @Test()
-  void testGetStat() {
-    String path = "/tmp/getStatTest";
-    _zkClient.deleteRecursive(path);
-
-    Stat stat, newStat;
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNull(stat);
-    _zkClient.createPersistent(path, true);
-
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNotNull(stat);
-
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertEquals(stat, newStat);
-
-    _zkClient.writeData(path, new ZNRecord("Test"));
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertNotSame(stat, newStat);
-  }
-
-  @Test()
-  void testSessioExpire() throws Exception {
-    IZkStateListener listener = new IZkStateListener() {
-
-      @Override
-      public void handleStateChanged(KeeperState state) throws Exception {
-        System.out.println("In Old connection New state " + state);
-      }
-
-      @Override
-      public void handleNewSession() throws Exception {
-        System.out.println("In Old connection New session");
-      }
-
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
-      }
-    };
-
-    _zkClient.subscribeStateChanges(listener);
-    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
-    ZooKeeper zookeeper = connection.getZookeeper();
-    System.out.println("old sessionId= " + zookeeper.getSessionId());
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        System.out.println("In New connection In process event:" + event);
-      }
-    };
-    ZooKeeper newZookeeper =
-        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
-            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
-    Thread.sleep(3000);
-    System.out.println("New sessionId= " + newZookeeper.getSessionId());
-    Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    connection = ((ZkConnection) _zkClient.getConnection());
-    zookeeper = connection.getZookeeper();
-    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
new file mode 100644
index 0000000..0019d40
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestZkClient extends ZkUnitTestBase {
+  private static Logger LOG = Logger.getLogger(TestZkClient.class);
+
+  ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _zkClient.close();
+  }
+
+  @Test()
+  void testGetStat() {
+    String path = "/tmp/getStatTest";
+    _zkClient.deleteRecursive(path);
+
+    Stat stat, newStat;
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNull(stat);
+    _zkClient.createPersistent(path, true);
+
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNotNull(stat);
+
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertEquals(stat, newStat);
+
+    _zkClient.writeData(path, new ZNRecord("Test"));
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertNotSame(stat, newStat);
+  }
+
+  @Test()
+  void testSessioExpire() throws Exception {
+    IZkStateListener listener = new IZkStateListener() {
+
+      @Override
+      public void handleStateChanged(KeeperState state) throws Exception {
+        System.out.println("In Old connection New state " + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception {
+        System.out.println("In Old connection New session");
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+      }
+    };
+
+    _zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+    ZooKeeper zookeeper = connection.getZookeeper();
+    System.out.println("old sessionId= " + zookeeper.getSessionId());
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        System.out.println("In New connection In process event:" + event);
+      }
+    };
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+    Thread.sleep(3000);
+    System.out.println("New sessionId= " + newZookeeper.getSessionId());
+    Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = ((ZkConnection) _zkClient.getConnection());
+    zookeeper = connection.getZookeeper();
+    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
+  void testDataSizeLimit() {
+    ZNRecord data = new ZNRecord(new String(new char[1024*1024]));
+    _zkClient.writeData("/test", data, -1);
+  }
+}


[6/7] helix git commit: Config to change the interval of healthReport collection

Posted by jx...@apache.org.
Config to change the interval of healthReport collection

To better support the health report, the configurable collection time interval is necessary. Add this configuration in system environment since there is no zookeeper connection happens before create healthreport tasks.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bd113a18
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bd113a18
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bd113a18

Branch: refs/heads/master
Commit: bd113a181a25611b7e2581f0bcbbce24d4aab1eb
Parents: 177d5bd
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Oct 3 15:22:47 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:22:47 2017 -0700

----------------------------------------------------------------------
 .../helix/healthcheck/ParticipantHealthReportTask.java      | 9 ++++++++-
 .../java/org/apache/helix/manager/zk/ZKHelixManager.java    | 9 +++++++--
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bd113a18/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
index f1c4c24..e0f38ba 100644
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
 public class ParticipantHealthReportTask extends HelixTimerTask {
   private static final Logger LOG = Logger.getLogger(ParticipantHealthReportTask.class);
   public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
+  private final int _reportLatency;
 
   Timer _timer;
   final ParticipantHealthReportCollectorImpl _healthReportCollector;
@@ -42,7 +43,13 @@ public class ParticipantHealthReportTask extends HelixTimerTask {
   }
 
   public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector) {
+    this(healthReportCollector, DEFAULT_REPORT_LATENCY);
+  }
+
+  public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector,
+      int reportLatency) {
     _healthReportCollector = healthReportCollector;
+    _reportLatency = reportLatency;
   }
 
   @Override
@@ -51,7 +58,7 @@ public class ParticipantHealthReportTask extends HelixTimerTask {
       LOG.info("Start HealthCheckInfoReportingTask");
       _timer = new Timer("ParticipantHealthReportTimerTask", true);
       _timer.scheduleAtFixedRate(new ParticipantHealthReportTimerTask(),
-          new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
+          new Random().nextInt(_reportLatency), _reportLatency);
     } else {
       LOG.warn("ParticipantHealthReportTimerTask already started");
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/bd113a18/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 670a65e..fdc6a55 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -100,6 +100,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
    * helix version#
    */
   private final String _version;
+  private int _reportLatency;
 
   protected ZkClient _zkclient = null;
   private final DefaultMessagingService _messagingService;
@@ -231,6 +232,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     _connectionRetryTimeout =
         getSystemPropertyAsInt("zk.connectionReEstablishment.timeout", DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
+    _reportLatency = getSystemPropertyAsInt("helixmanager.participantHealthReport.reportLatency",
+        ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY);
 
     /**
      * instance type specific init
@@ -240,7 +243,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _stateMachineEngine = new HelixStateMachineEngine(this);
       _participantHealthInfoCollector =
           new ParticipantHealthReportCollectorImpl(this, _instanceName);
-      _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
+      _timerTasks
+          .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
       break;
     case CONTROLLER:
       _stateMachineEngine = null;
@@ -253,7 +257,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _participantHealthInfoCollector =
           new ParticipantHealthReportCollectorImpl(this, _instanceName);
 
-      _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
+      _timerTasks
+          .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
       _controllerTimerTasks.add(new StatusDumpTask(this));
       break;
     case ADMINISTRATOR:


[7/7] helix git commit: [HELIX-668] Fix remove context with namespaced job name

Posted by jx...@apache.org.
[HELIX-668] Fix remove context with namespaced job name


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d57882b9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d57882b9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d57882b9

Branch: refs/heads/master
Commit: d57882b9b613a2203886e1ef0da74ccc077d64c3
Parents: bd113a1
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Oct 3 15:28:50 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 18:47:09 2017 -0700

----------------------------------------------------------------------
 helix-core/src/main/java/org/apache/helix/task/TaskDriver.java   | 2 +-
 .../apache/helix/integration/TestPartitionMovementThrottle.java  | 4 ++--
 .../integration/task/TestGenericTaskAssignmentCalculator.java    | 2 +-
 .../src/test/java/org/apache/helix/manager/zk/TestZkClient.java  | 2 +-
 4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d57882b9/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 97703f7..df5cdf6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -360,7 +360,7 @@ public class TaskDriver {
     removeJobStateFromQueue(queueName, jobName);
 
     // Delete the job from property store
-    TaskUtil.removeJobContext(_propertyStore, jobName);
+    TaskUtil.removeJobContext(_propertyStore, namespacedJobName);
   }
 
   /** Remove the job name from the DAG from the queue configuration */

http://git-wip-us.apache.org/repos/asf/helix/blob/d57882b9/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
index 7a87a0f..a2596b3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -268,13 +268,13 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
         */
         if (!resourcePatitionTransitionTimes.containsKey(message.getResourceName())) {
           resourcePatitionTransitionTimes
-              .put(message.getResourceName(), new ArrayList<PartitionTransitionTime>());
+              .put(message.getResourceName(), Collections.synchronizedList(new ArrayList<PartitionTransitionTime>()));
         }
         resourcePatitionTransitionTimes.get(message.getResourceName()).add(partitionTransitionTime);
 
         if (!instancePatitionTransitionTimes.containsKey(message.getTgtName())) {
           instancePatitionTransitionTimes
-              .put(message.getTgtName(), new ArrayList<PartitionTransitionTime>());
+              .put(message.getTgtName(), Collections.synchronizedList(new ArrayList<PartitionTransitionTime>()));
         }
         instancePatitionTransitionTimes.get(message.getTgtName()).add(partitionTransitionTime);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/d57882b9/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
index cd6822a..030b7b9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -180,7 +180,7 @@ public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
       }
     }
 
-    Assert.assertEquals(abortedTask, 4);
+    Assert.assertTrue(abortedTask > 0);
   }
 
   private class TaskOne extends MockTask {

http://git-wip-us.apache.org/repos/asf/helix/blob/d57882b9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
index 0019d40..1c86877 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
@@ -116,7 +116,7 @@ public class TestZkClient extends ZkUnitTestBase {
 
   @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
   void testDataSizeLimit() {
-    ZNRecord data = new ZNRecord(new String(new char[1024*1024]));
+    ZNRecord data = new ZNRecord(new String(new char[1024*1024*128]));
     _zkClient.writeData("/test", data, -1);
   }
 }


[3/7] helix git commit: Prevent ClusterControllerManager from starting multiple times

Posted by jx...@apache.org.
Prevent ClusterControllerManager from starting multiple times

ClusterControllerManager is a runnable wrapper for a Helix Controller that could run on a separate thread for testing purpose. Since HelixManager.connect() should not be called more than once, this Controller should not be started more than once, either.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/94f39618
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/94f39618
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/94f39618

Branch: refs/heads/master
Commit: 94f3961842263d04eef89019a1955e4c49e3305c
Parents: d5a2395
Author: Weihan Kong <wk...@linkedin.com>
Authored: Wed Feb 8 23:38:49 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:07:52 2017 -0700

----------------------------------------------------------------------
 .../integration/manager/ClusterControllerManager.java    | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/94f39618/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 9e10771..92ed52b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -36,6 +36,8 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
   private final CountDownLatch _stopCountDown = new CountDownLatch(1);
   private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
+  private boolean _started = false;
+
   public ClusterControllerManager(String zkAddr, String clusterName) {
     this(zkAddr, clusterName, "controller");
   }
@@ -48,13 +50,20 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
     _stopCountDown.countDown();
     try {
       _waitStopFinishCountDown.await();
+      _started = false;
     } catch (InterruptedException e) {
       LOG.error("Interrupted waiting for finish", e);
     }
   }
 
+  // This should not be called more than once because HelixManager.connect() should not be called more than once.
   public void syncStart() {
-    // TODO: prevent start multiple times
+    if (_started) {
+      throw new RuntimeException("Helix Controller already started. Do not call syncStart() more than once.");
+    } else {
+      _started = true;
+    }
+
     new Thread(this).start();
     try {
       _startCountDown.await();


[4/7] helix git commit: Be able to stop workflow when no job is running.

Posted by jx...@apache.org.
Be able to stop workflow when no job is running.

Currently, to stop a workflow, the target state of the workflow is set to STOP, then when each job(as a resource in ideal state) was processed in job rebalancer, it will check whether all the jobs in the workflow is done(not in IN_PROGRESS or STOPPING) and set the workflow state to be STOP.
However, if all the jobs are already done, there’s no job in ideal state to process, so the workflow state never gets a chance to be set to STOP.

This commit adds a check in workflow rebalancer to set the state when all jobs are already done.

A test is added to test specifically this case.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/408082a3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/408082a3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/408082a3

Branch: refs/heads/master
Commit: 408082a33d91f84556c3da31232fb6d4097b4371
Parents: 94f3961
Author: Weihan Kong <wk...@linkedin.com>
Authored: Mon Feb 13 13:52:16 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:08:33 2017 -0700

----------------------------------------------------------------------
 .../apache/helix/task/WorkflowRebalancer.java   |  4 ++
 .../integration/task/TestStopWorkflow.java      | 45 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 830f93a..8e72f7a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -76,6 +76,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     if (targetState == TargetState.STOP) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+      }
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
new file mode 100644
index 0000000..b641698
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -0,0 +1,45 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStopWorkflow extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numParitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testStopWorkflow() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
+    jobQueue.enqueueJob("job2_will_fail", jobBuilder);
+    _driver.start(jobQueue.build());
+
+    // job1 should succeed and job2 should fail, wait until that happens
+    _driver.pollForJobState(jobQueueName,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED);
+
+    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
+
+    // Now stop the workflow, and it should be stopped because all jobs have completed or failed.
+    _driver.waitToStop(jobQueueName, 4000);
+
+    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+  }
+}


[2/7] helix git commit: Fix TestBatchMessage test fail

Posted by jx...@apache.org.
Fix TestBatchMessage test fail

Test fail because new NO_OP message send as new MessageHandlerFactory registered.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5a2395d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5a2395d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5a2395d

Branch: refs/heads/master
Commit: d5a2395d207da618e767d1f07e20fa116d716cb3
Parents: f9f554e
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Feb 8 17:12:18 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:07:07 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/integration/TestBatchMessage.java  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d5a2395d/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index e4a2990..9e62c0d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -105,7 +105,9 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             clusterName));
     Assert.assertTrue(result);
-    Assert.assertTrue(listener._maxNbOfChilds <= 2,
+    // Change to three is because there is an extra factory registered
+    // So one extra NO_OP message send
+    Assert.assertTrue(listener._maxNbOfChilds <= 3,
         "Should get no more than 2 messages (O->S and S->M)");
 
     // clean up
@@ -185,7 +187,9 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             clusterName));
     Assert.assertTrue(result);
-    Assert.assertTrue(listener._maxNbOfChilds <= 2,
+    // Change to three is because there is an extra factory registered
+    // So one extra NO_OP message send
+    Assert.assertTrue(listener._maxNbOfChilds <= 3,
         "Should get no more than 2 messages (O->S and S->M)");
 
     // clean up


[5/7] helix git commit: Add timeout in JobConfig

Posted by jx...@apache.org.
Add timeout in JobConfig

To support job-level timeout for the task framework, add the configuration field. Associated changed is made in builder and JobBean.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/177d5bdc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/177d5bdc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/177d5bdc

Branch: refs/heads/master
Commit: 177d5bdc29fc2011e12ca82d7bdf5456ef31a956
Parents: 408082a
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Oct 3 15:18:32 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:18:32 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 30 ++++++++++++++++++--
 .../java/org/apache/helix/task/TaskState.java   |  7 ++++-
 .../org/apache/helix/task/beans/JobBean.java    |  1 +
 3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 12aa058..6c3aed4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -80,6 +80,10 @@ public class JobConfig extends ResourceConfig {
      */
     JobCommandConfig,
     /**
+     * The allowed execution time of the job.
+     */
+    Timeout,
+    /**
      * The timeout for a task.
      */
     TimeoutPerPartition,
@@ -151,6 +155,7 @@ public class JobConfig extends ResourceConfig {
   }
 
   //Default property values
+  public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
   public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr.
   public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10;
@@ -171,7 +176,7 @@ public class JobConfig extends ResourceConfig {
   public JobConfig(String jobId, JobConfig jobConfig) {
     this(jobConfig.getWorkflow(), jobConfig.getTargetResource(), jobConfig.getTargetPartitions(),
         jobConfig.getTargetPartitionStates(), jobConfig.getCommand(),
-        jobConfig.getJobCommandConfigMap(), jobConfig.getTimeoutPerTask(),
+        jobConfig.getJobCommandConfigMap(), jobConfig.getTimeout(), jobConfig.getTimeoutPerTask(),
         jobConfig.getNumConcurrentTasksPerInstance(), jobConfig.getMaxAttemptsPerTask(),
         jobConfig.getMaxAttemptsPerTask(), jobConfig.getFailureThreshold(),
         jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(),
@@ -183,7 +188,7 @@ public class JobConfig extends ResourceConfig {
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap,
-      long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
+      long timeout, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
       boolean disableExternalView, boolean ignoreDependentJobFailure,
       Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag,
@@ -221,6 +226,7 @@ public class JobConfig extends ResourceConfig {
     if (executionStart > 0) {
       getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart);
     }
+    getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout);
     getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask);
     getRecord().setIntField(JobConfigProperty.MaxAttemptsPerTask.name(), maxAttemptsPerTask);
     getRecord().setIntField(JobConfigProperty.MaxForcedReassignmentsPerTask.name(),
@@ -289,6 +295,10 @@ public class JobConfig extends ResourceConfig {
         : null;
   }
 
+  public long getTimeout() {
+    return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT);
+  }
+
   public long getTimeoutPerTask() {
     return getRecord()
         .getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK);
@@ -389,6 +399,7 @@ public class JobConfig extends ResourceConfig {
     private String _command;
     private Map<String, String> _commandConfig;
     private Map<String, TaskConfig> _taskConfigMap = Maps.newHashMap();
+    private long _timeout = DEFAULT_TIMEOUT;
     private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK;
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK;
@@ -417,7 +428,7 @@ public class JobConfig extends ResourceConfig {
       validate();
 
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
-          _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
+          _command, _commandConfig, _timeout, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
           _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
           _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry,
@@ -456,6 +467,9 @@ public class JobConfig extends ResourceConfig {
             cfg.get(JobConfigProperty.JobCommandConfig.name()));
         b.setJobCommandConfigMap(commandConfigMap);
       }
+      if (cfg.containsKey(JobConfigProperty.Timeout.name())) {
+        b.setTimeout(Long.parseLong(cfg.get(JobConfigProperty.Timeout.name())));
+      }
       if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) {
         b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name())));
       }
@@ -544,6 +558,11 @@ public class JobConfig extends ResourceConfig {
       return this;
     }
 
+    public Builder setTimeout(long v) {
+      _timeout = v;
+      return this;
+    }
+
     public Builder setTimeoutPerTask(long v) {
       _timeoutPerTask = v;
       return this;
@@ -660,6 +679,10 @@ public class JobConfig extends ResourceConfig {
           }
         }
       }
+      if (_timeout < 0) {
+        throw new IllegalArgumentException(String
+            .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout));
+      }
       if (_timeoutPerTask < 0) {
         throw new IllegalArgumentException(String
             .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition,
@@ -696,6 +719,7 @@ public class JobConfig extends ResourceConfig {
 
       b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask)
           .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance)
+          .setTimeout(jobBean.timeout)
           .setTimeoutPerTask(jobBean.timeoutPerPartition)
           .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay)
           .setDisableExternalView(jobBean.disableExternalView)

http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 4e12f2d..3713c40 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -50,5 +50,10 @@ public enum TaskState {
   /**
    * The task are aborted due to workflow fail
    */
-  ABORTED
+  ABORTED,
+  /**
+   * The allowed execution time for the job.
+   * TODO: also use this for the task
+   */
+  TIMED_OUT
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/177d5bdc/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 7b42ad2..8d2f259 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -38,6 +38,7 @@ public class JobBean {
   public String command;
   public Map<String, String> jobCommandConfigMap;
   public List<TaskBean> tasks;
+  public long timeout = JobConfig.DEFAULT_TIMEOUT;
   public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
   public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;