You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/03/11 01:08:29 UTC

hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2 (Vladimir Rodionov) - remove ZKProcedureCoordinatorRpcs

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14123 3aaea8e04 -> 063f539c7


HBASE-14123 HBase Backup/Restore Phase 2 (Vladimir Rodionov) - remove ZKProcedureCoordinatorRpcs


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/063f539c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/063f539c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/063f539c

Branch: refs/heads/HBASE-14123
Commit: 063f539c7bb5a92074c370f1144d91eae7a549e8
Parents: 3aaea8e
Author: tedyu <yu...@gmail.com>
Authored: Fri Mar 10 17:08:20 2017 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Fri Mar 10 17:08:20 2017 -0800

----------------------------------------------------------------------
 .../procedure/ZKProcedureCoordinatorRpcs.java   | 328 -------------------
 1 file changed, 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/063f539c/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
deleted file mode 100644
index b656894..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.errorhandling.ForeignException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
- */
-@InterfaceAudience.Private
-public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs {
-  private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinator.class);
-  private ZKProcedureUtil zkProc = null;
-  protected ProcedureCoordinator coordinator = null;  // if started this should be non-null
-
-  ZooKeeperWatcher watcher;
-  String procedureType;
-  String coordName;
-
-  /**
-   * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
-   * @param procedureClass procedure type name is a category for when there are multiple kinds of
-   *    procedures.-- this becomes a znode so be aware of the naming restrictions
-   * @param coordName name of the node running the coordinator
-   * @throws KeeperException if an unexpected zk error occurs
-   */
-  public ZKProcedureCoordinator(ZooKeeperWatcher watcher,
-      String procedureClass, String coordName) {
-    this.watcher = watcher;
-    this.procedureType = procedureClass;
-    this.coordName = coordName;
-  }
-
-  /**
-   * The "acquire" phase.  The coordinator creates a new procType/acquired/ znode dir. If znodes
-   * appear, first acquire to relevant listener or sets watch waiting for notification of
-   * the acquire node
-   *
-   * @param proc the Procedure
-   * @param info data to be stored in the acquire node
-   * @param nodeNames children of the acquire phase
-   * @throws IOException if any failure occurs.
-   */
-  @Override
-  final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
-      throws IOException, IllegalArgumentException {
-    String procName = proc.getName();
-    // start watching for the abort node
-    String abortNode = zkProc.getAbortZNode(procName);
-    try {
-      // check to see if the abort node already exists
-      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
-        abort(abortNode);
-      }
-      // If we get an abort node watch triggered here, we'll go complete creating the acquired
-      // znode but then handle the acquire znode and bail out
-    } catch (KeeperException e) {
-      String msg = "Failed while watching abort node:" + abortNode;
-      LOG.error(msg, e);
-      throw new IOException(msg, e);
-    }
-
-    // create the acquire barrier
-    String acquire = zkProc.getAcquiredBarrierNode(procName);
-    LOG.debug("Creating acquire znode:" + acquire);
-    try {
-      // notify all the procedure listeners to look for the acquire node
-      byte[] data = ProtobufUtil.prependPBMagic(info);
-      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
-      // loop through all the children of the acquire phase and watch for them
-      for (String node : nodeNames) {
-        String znode = ZKUtil.joinZNode(acquire, node);
-        LOG.debug("Watching for acquire node:" + znode);
-        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
-          coordinator.memberAcquiredBarrier(procName, node);
-        }
-      }
-    } catch (KeeperException e) {
-      String msg = "Failed while creating acquire node:" + acquire;
-      LOG.error(msg, e);
-      throw new IOException(msg, e);
-    }
-  }
-
-  @Override
-  public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
-    String procName = proc.getName();
-    String reachedNode = zkProc.getReachedBarrierNode(procName);
-    LOG.debug("Creating reached barrier zk node:" + reachedNode);
-    try {
-      // create the reached znode and watch for the reached znodes
-      ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
-      // loop through all the children of the acquire phase and watch for them
-      for (String node : nodeNames) {
-        String znode = ZKUtil.joinZNode(reachedNode, node);
-        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
-          byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
-          // ProtobufUtil.isPBMagicPrefix will check null
-          if (dataFromMember != null && dataFromMember.length > 0) {
-            if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
-              String msg =
-                "Failed to get data from finished node or data is illegally formatted: " + znode;
-              LOG.error(msg);
-              throw new IOException(msg);
-            } else {
-              dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
-                dataFromMember.length);
-              coordinator.memberFinishedBarrier(procName, node, dataFromMember);
-            }
-          } else {
-            coordinator.memberFinishedBarrier(procName, node, dataFromMember);
-          }
-        }
-      }
-    } catch (KeeperException e) {
-      String msg = "Failed while creating reached node:" + reachedNode;
-      LOG.error(msg, e);
-      throw new IOException(msg, e);
-    } catch (InterruptedException e) {
-      String msg = "Interrupted while creating reached node:" + reachedNode;
-      LOG.error(msg, e);
-      throw new InterruptedIOException(msg);
-    }
-  }
-
-
-  /**
-   * Delete znodes that are no longer in use.
-   */
-  @Override
-  final public void resetMembers(Procedure proc) throws IOException {
-    String procName = proc.getName();
-    boolean stillGettingNotifications = false;
-    do {
-      try {
-        LOG.debug("Attempting to clean out zk node for op:" + procName);
-        zkProc.clearZNodes(procName);
-        stillGettingNotifications = false;
-      } catch (KeeperException.NotEmptyException e) {
-        // recursive delete isn't transactional (yet) so we need to deal with cases where we get
-        // children trickling in
-        stillGettingNotifications = true;
-      } catch (KeeperException e) {
-        String msg = "Failed to complete reset procedure " + procName;
-        LOG.error(msg, e);
-        throw new IOException(msg, e);
-      }
-    } while (stillGettingNotifications);
-  }
-
-  /**
-   * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
-   * @return true if succeed, false if encountered initialization errors.
-   */
-  @Override
-  final public boolean start(final ProcedureCoordinator coordinator) {
-    if (this.coordinator != null) {
-      throw new IllegalStateException(
-        "ZKProcedureCoordinator already started and already has listener installed");
-    }
-    this.coordinator = coordinator;
-
-    try {
-      this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
-        @Override
-        public void nodeCreated(String path) {
-          if (!isInProcedurePath(path)) return;
-          LOG.debug("Node created: " + path);
-          logZKTree(this.baseZNode);
-          if (isAcquiredPathNode(path)) {
-            // node wasn't present when we created the watch so zk event triggers acquire
-            coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
-              ZKUtil.getNodeName(path));
-          } else if (isReachedPathNode(path)) {
-            // node was absent when we created the watch so zk event triggers the finished barrier.
-
-            // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
-            String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
-            String member = ZKUtil.getNodeName(path);
-            // get the data from the procedure member
-            try {
-              byte[] dataFromMember = ZKUtil.getData(watcher, path);
-              // ProtobufUtil.isPBMagicPrefix will check null
-              if (dataFromMember != null && dataFromMember.length > 0) {
-                if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
-                  ForeignException ee = new ForeignException(coordName,
-                    "Failed to get data from finished node or data is illegally formatted:"
-                        + path);
-                  coordinator.abortProcedure(procName, ee);
-                } else {
-                  dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
-                    dataFromMember.length);
-                  LOG.debug("Finished data from procedure '" + procName
-                    + "' member '" + member + "': " + new String(dataFromMember));
-                  coordinator.memberFinishedBarrier(procName, member, dataFromMember);
-                }
-              } else {
-                coordinator.memberFinishedBarrier(procName, member, dataFromMember);
-              }
-            } catch (KeeperException e) {
-              ForeignException ee = new ForeignException(coordName, e);
-              coordinator.abortProcedure(procName, ee);
-            } catch (InterruptedException e) {
-              ForeignException ee = new ForeignException(coordName, e);
-              coordinator.abortProcedure(procName, ee);
-            }
-          } else if (isAbortPathNode(path)) {
-            abort(path);
-          } else {
-            LOG.debug("Ignoring created notification for node:" + path);
-          }
-        }
-      };
-      zkProc.clearChildZNodes();
-    } catch (KeeperException e) {
-      LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
-      return false;
-    }
-
-    LOG.debug("Starting the controller for procedure member:" + coordName);
-    return true;
-  }
-
-  /**
-   * This is the abort message being sent by the coordinator to member
-   *
-   * TODO this code isn't actually used but can be used to issue a cancellation from the
-   * coordinator.
-   */
-  @Override
-  final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
-    String procName = proc.getName();
-    LOG.debug("Aborting procedure '" + procName + "' in zk");
-    String procAbortNode = zkProc.getAbortZNode(procName);
-    try {
-      LOG.debug("Creating abort znode:" + procAbortNode);
-      String source = (ee.getSource() == null) ? coordName : ee.getSource();
-      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
-      // first create the znode for the procedure
-      ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
-      LOG.debug("Finished creating abort node:" + procAbortNode);
-    } catch (KeeperException e) {
-      // possible that we get this error for the procedure if we already reset the zk state, but in
-      // that case we should still get an error for that procedure anyways
-      zkProc.logZKTree(zkProc.baseZNode);
-      coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
-          + " to abort procedure '" + procName + "'", new IOException(e));
-    }
-  }
-
-  /**
-   * Receive a notification and propagate it to the local coordinator
-   * @param abortNode full znode path to the failed procedure information
-   */
-  protected void abort(String abortNode) {
-    String procName = ZKUtil.getNodeName(abortNode);
-    ForeignException ee = null;
-    try {
-      byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
-      if (data == null || data.length == 0) {
-        // ignore
-        return;
-      } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
-        LOG.warn("Got an error notification for op:" + abortNode
-            + " but we can't read the information. Killing the procedure.");
-        // we got a remote exception, but we can't describe it
-        ee = new ForeignException(coordName,
-          "Data in abort node is illegally formatted.  ignoring content.");
-      } else {
-
-        data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
-        ee = ForeignException.deserialize(data);
-      }
-    } catch (IOException e) {
-      LOG.warn("Got an error notification for op:" + abortNode
-          + " but we can't read the information. Killing the procedure.");
-      // we got a remote exception, but we can't describe it
-      ee = new ForeignException(coordName, e);
-    } catch (KeeperException e) {
-      coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
-          + zkProc.getAbortZnode(), new IOException(e));
-    } catch (InterruptedException e) {
-      coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
-          + zkProc.getAbortZnode(), new IOException(e));
-      Thread.currentThread().interrupt();
-    }
-    coordinator.abortProcedure(procName, ee);
-  }
-
-  @Override
-  final public void close() throws IOException {
-    zkProc.close();
-  }
-
-  /**
-   * Used in testing
-   */
-  final ZKProcedureUtil getZkProcedureUtil() {
-    return zkProc;
-  }
-}