You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@twill.apache.org by chtyim <gi...@git.apache.org> on 2018/03/09 22:13:50 UTC

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

GitHub user chtyim opened a pull request:

    https://github.com/apache/twill/pull/67

    (TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed

    - Delete the Kafka root zk node for the application if already exist
    - Delete the AM instance zk node if already exist
    - For runnables parent zk node, it is not an error if it already exist
    - Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
      - When AM killed and restarted, the embedded Kafka will be running in different host and port

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chtyim/twill feature/TWILL-61

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/twill/pull/67.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #67
    
----
commit 9decca071a9e067b30be2150a6097463c939b6af
Author: Terence Yim <ch...@...>
Date:   2018-03-09T20:21:26Z

    (TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed
    
    - Delete the Kafka root zk node for the application if already exist
    - Delete the AM instance zk node if already exist
    - For runnables parent zk node, it is not an error if it already exist
    - Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
      - When AM killed and restarted, the embedded Kafka will be running in different host and port

----


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174617783
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    --- End diff --
    
    Ephemeral node won't go away immediate if the process die. It will stay there till ZK session timeout, which is typically multiple seconds. In the meantime, the next AM process may already be started by YARN, hence the new AM process will see the ephemeral node.


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174618874
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    --- End diff --
    
    That's right. Missed it.


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174618087
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    +        // Try to delete the node and recreate it
    +        LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath);
    +        Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    +                                thisCallback, Threads.SAME_THREAD_EXECUTOR);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            createFailure.addSuppressed(t);
    +            resultFuture.setException(createFailure);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    }, Threads.SAME_THREAD_EXECUTOR);
    +
    +    return resultFuture;
    --- End diff --
    
    Well, we do need that many levels of callback (create -> delete -> create) for the operation. Any suggestions on how to simplify it?


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174618395
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    +        // Try to delete the node and recreate it
    +        LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath);
    +        Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    +                                thisCallback, Threads.SAME_THREAD_EXECUTOR);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            createFailure.addSuppressed(t);
    +            resultFuture.setException(createFailure);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    }, Threads.SAME_THREAD_EXECUTOR);
    +
    +    return resultFuture;
    --- End diff --
    
    I can pull the common code between this and the ApplicationMasterMain class into a util function. But still, inside the util function, there would be three callbacks.


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174638923
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    --- End diff --
    
    got it


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174573795
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    +        // Try to delete the node and recreate it
    +        LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath);
    +        Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    +                                thisCallback, Threads.SAME_THREAD_EXECUTOR);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            createFailure.addSuppressed(t);
    +            resultFuture.setException(createFailure);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    }, Threads.SAME_THREAD_EXECUTOR);
    +
    +    return resultFuture;
    --- End diff --
    
    while this code appears correct (after addressing my comment), the three nested levels of callback make it almost impossible to read. Is there some way to unwind this?


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174571435
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    --- End diff --
    
    do you need to return here?


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174626043
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java ---
    @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
           }
     
           String newBrokerList = brokerService.getBrokerList();
    -      if (newBrokerList.isEmpty()) {
    -        LOG.warn("Broker list is empty. No Kafka producer is created.");
    -        return;
    -      }
     
    +      // If there is no change, whether it is empty or not, just return
           if (Objects.equal(brokerList, newBrokerList)) {
             return;
           }
     
    -      Properties props = new Properties();
    -      props.put("metadata.broker.list", newBrokerList);
    -      props.put("serializer.class", ByteBufferEncoder.class.getName());
    -      props.put("key.serializer.class", IntegerEncoder.class.getName());
    -      props.put("partitioner.class", IntegerPartitioner.class.getName());
    -      props.put("request.required.acks", Integer.toString(ack.getAck()));
    -      props.put("compression.codec", compression.getCodec());
    +      Producer<Integer, ByteBuffer> newProducer = null;
    +      if (!newBrokerList.isEmpty()) {
    +        Properties props = new Properties();
    +        props.put("metadata.broker.list", newBrokerList);
    +        props.put("serializer.class", ByteBufferEncoder.class.getName());
    +        props.put("key.serializer.class", IntegerEncoder.class.getName());
    +        props.put("partitioner.class", IntegerPartitioner.class.getName());
    +        props.put("request.required.acks", Integer.toString(ack.getAck()));
    +        props.put("compression.codec", compression.getCodec());
    +
    +        ProducerConfig config = new ProducerConfig(props);
    +        newProducer = new Producer<>(config);
    +      }
     
    -      ProducerConfig config = new ProducerConfig(props);
    -      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config));
    +      // If the broker list is empty, the producer will be set to null
    +      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer);
           if (oldProducer != null) {
             oldProducer.close();
           }
     
    -      LOG.info("Update Kafka producer broker list: {}", newBrokerList);
    +      if (newBrokerList.isEmpty()) {
    +        LOG.warn("Empty Kafka producer broker list, publish will fail.");
    --- End diff --
    
    Yes


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174643542
  
    --- Diff: twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java ---
    @@ -281,6 +286,73 @@ public void onFailure(Throwable t) {
         return resultFuture;
       }
     
    +  /**
    +   * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen
    +   * and the creation will be retried.
    +   */
    +  public static OperationFuture<String> createDeleteIfExists(final ZKClient zkClient, final String path,
    +                                                             @Nullable final byte[] data, final CreateMode createMode,
    +                                                             final boolean createParent, final ACL...acls) {
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(path,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    final List<ACL> createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
    +    createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback<String>() {
    +
    +      final FutureCallback<String> createCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        // Create succeeded, just set the result to the resultFuture
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        // If create failed not because of the NodeExistsException, just set the exception to the result future
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +          return;
    +        }
    +
    +        // Try to delete the path
    +        LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path);
    +        Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            // If delete succeeded, perform the creation again.
    +            createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            // If deletion failed because of NoNodeException, fail the result operation future
    +            if (!(t instanceof KeeperException.NoNodeException)) {
    +              createFailure.addSuppressed(t);
    +              resultFuture.setException(createFailure);
    +              return;
    +            }
    +
    +            // If can't delete because the node no longer exists, just go ahead and recreate the node
    +            createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    });
    +
    +    return resultFuture;
    +  }
    +
    +  /**
    +   * Private helper method to create a ZK node based on the parameter. The result of the creation is always
    +   * communicate via the provided {@link FutureCallback}.
    +   */
    +  private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data,
    +                                 CreateMode createMode, boolean createParent,
    +                                 Iterable<ACL> acls, FutureCallback<String> callback) {
    +    Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls),
    +                        callback, Threads.SAME_THREAD_EXECUTOR);
    +  }
    --- End diff --
    
    yes, easier to understand and extracted into separate methods. Like it much better now 


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174571807
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    --- End diff --
    
    how can it leave an ephemeral node? Doesn't that mean there must be a zombie process holding on to that node?


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174583711
  
    --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---
    @@ -165,9 +168,47 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) {
     
         @Override
         protected void startUp() throws Exception {
    -      ZKOperations.ignoreError(
    -        zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
    -        KeeperException.NodeExistsException.class, kafkaZKPath).get();
    +      // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
    +      // no left over content from previous AM attempt.
    +      final SettableOperationFuture<String> completion = SettableOperationFuture.create(kafkaZKPath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +      LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
    +      Futures.addCallback(zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT), new FutureCallback<String>() {
    +
    +        final FutureCallback<String> thisCallback = this;
    +
    +        @Override
    +        public void onSuccess(String result) {
    +          completion.set(result);
    +        }
    +
    +        @Override
    +        public void onFailure(final Throwable createFailure) {
    +          if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +            completion.setException(createFailure);
    +          }
    --- End diff --
    
    return here?


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174580690
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java ---
    @@ -159,30 +159,37 @@ public void changed(BrokerService brokerService) {
           }
     
           String newBrokerList = brokerService.getBrokerList();
    -      if (newBrokerList.isEmpty()) {
    -        LOG.warn("Broker list is empty. No Kafka producer is created.");
    -        return;
    -      }
     
    +      // If there is no change, whether it is empty or not, just return
           if (Objects.equal(brokerList, newBrokerList)) {
             return;
           }
     
    -      Properties props = new Properties();
    -      props.put("metadata.broker.list", newBrokerList);
    -      props.put("serializer.class", ByteBufferEncoder.class.getName());
    -      props.put("key.serializer.class", IntegerEncoder.class.getName());
    -      props.put("partitioner.class", IntegerPartitioner.class.getName());
    -      props.put("request.required.acks", Integer.toString(ack.getAck()));
    -      props.put("compression.codec", compression.getCodec());
    +      Producer<Integer, ByteBuffer> newProducer = null;
    +      if (!newBrokerList.isEmpty()) {
    +        Properties props = new Properties();
    +        props.put("metadata.broker.list", newBrokerList);
    +        props.put("serializer.class", ByteBufferEncoder.class.getName());
    +        props.put("key.serializer.class", IntegerEncoder.class.getName());
    +        props.put("partitioner.class", IntegerPartitioner.class.getName());
    +        props.put("request.required.acks", Integer.toString(ack.getAck()));
    +        props.put("compression.codec", compression.getCodec());
    +
    +        ProducerConfig config = new ProducerConfig(props);
    +        newProducer = new Producer<>(config);
    +      }
     
    -      ProducerConfig config = new ProducerConfig(props);
    -      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config));
    +      // If the broker list is empty, the producer will be set to null
    +      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer);
           if (oldProducer != null) {
             oldProducer.close();
           }
     
    -      LOG.info("Update Kafka producer broker list: {}", newBrokerList);
    +      if (newBrokerList.isEmpty()) {
    +        LOG.warn("Empty Kafka producer broker list, publish will fail.");
    --- End diff --
    
    So when will this happen? If the AM dies (and its broker with it)? 


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174644467
  
    --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---
    @@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) {
         protected void startUp() throws Exception {
           // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
           // no left over content from previous AM attempt.
    -      final SettableOperationFuture<String> completion = SettableOperationFuture.create(kafkaZKPath,
    -                                                                                        Threads.SAME_THREAD_EXECUTOR);
    -      LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
    --- End diff --
    
    oh yeah, accidentally removed.


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174643070
  
    --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---
    @@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) {
         protected void startUp() throws Exception {
           // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
           // no left over content from previous AM attempt.
    -      final SettableOperationFuture<String> completion = SettableOperationFuture.create(kafkaZKPath,
    -                                                                                        Threads.SAME_THREAD_EXECUTOR);
    -      LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
    --- End diff --
    
    maybe keep the log message?


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/twill/pull/67


---

[GitHub] twill pull request #67: (TWILL-61) Fix to allow higher attempts to relaunch ...

Posted by anew <gi...@git.apache.org>.
Github user anew commented on a diff in the pull request:

    https://github.com/apache/twill/pull/67#discussion_r174639182
  
    --- Diff: twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---
    @@ -216,11 +217,52 @@ protected final void shutDown() throws Exception {
         return zkClient.setData(liveNodePath, serializeLiveNode());
       }
     
    +  /**
    +   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
    +   *
    +   * @return A {@link OperationFuture} that will be completed when the creation is done.
    +   */
       private OperationFuture<String> createLiveNode() {
    -    String liveNodePath = getLiveNodePath();
    +    final String liveNodePath = getLiveNodePath();
         LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
    -    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    -                                    KeeperException.NodeExistsException.class, liveNodePath);
    +    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(liveNodePath,
    +                                                                                        Threads.SAME_THREAD_EXECUTOR);
    +    OperationFuture<String> createFuture = zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL);
    +    Futures.addCallback(createFuture, new FutureCallback<String>() {
    +      final FutureCallback<String> thisCallback = this;
    +
    +      @Override
    +      public void onSuccess(String result) {
    +        LOG.info("Live node created {}{}", zkClient.getConnectString(), liveNodePath);
    +        resultFuture.set(result);
    +      }
    +
    +      @Override
    +      public void onFailure(final Throwable createFailure) {
    +        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
    +          resultFuture.setException(createFailure);
    +        }
    +
    +        // If the node already exists, it is due to previous run attempt that left an ephemeral node.
    +        // Try to delete the node and recreate it
    +        LOG.info("Live node already exist. Deleting node {}{}", zkClient.getConnectString(), liveNodePath);
    +        Futures.addCallback(zkClient.delete(liveNodePath), new FutureCallback<String>() {
    +          @Override
    +          public void onSuccess(String result) {
    +            Futures.addCallback(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
    +                                thisCallback, Threads.SAME_THREAD_EXECUTOR);
    +          }
    +
    +          @Override
    +          public void onFailure(Throwable t) {
    +            createFailure.addSuppressed(t);
    +            resultFuture.setException(createFailure);
    +          }
    +        }, Threads.SAME_THREAD_EXECUTOR);
    +      }
    +    }, Threads.SAME_THREAD_EXECUTOR);
    +
    +    return resultFuture;
    --- End diff --
    
    I was thinking it does not have to be async. Create the ZK node, get the future. If success, done. If not delete the ZK node, get the future. If failure, throw. Else try again. But maybe that would be equally complex?


---

[GitHub] twill issue #67: (TWILL-61) Fix to allow higher attempts to relaunch the app...

Posted by chtyim <gi...@git.apache.org>.
Github user chtyim commented on the issue:

    https://github.com/apache/twill/pull/67
  
    I've addressed the comments and fixed one more issue (one deletion failure, if the node not exist, we can just go ahead and recreate the node instead of failing).
    
     Also refactored the callback code a bit to try to make it cleaner.


---