You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/01/18 18:02:28 UTC

svn commit: r1435250 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase: client/replication/ replication/

Author: stack
Date: Fri Jan 18 17:02:28 2013
New Revision: 1435250

URL: http://svn.apache.org/viewvc?rev=1435250&view=rev
Log:
HBASE-7565 [replication] Create an interface for the replication state node

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1435250&r1=1435249&r2=1435250&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Fri Jan 18 17:02:28 2013
@@ -185,7 +185,7 @@ public class ReplicationAdmin implements
     boolean prev = true;
     try {
       prev = getReplicating();
-      this.replicationZk.setReplicating(newState);
+      this.replicationZk.setReplication(newState);
     } catch (KeeperException e) {
       throw new IOException("Unable to set the replication state", e);
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1435250&r1=1435249&r2=1435250&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Fri Jan 18 17:02:28 2013
@@ -90,7 +90,7 @@ public class ReplicationPeer implements 
   }
 
   private void readPeerStateZnode() throws DeserializationException {
-    this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false)));
+    this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
   }
 
   /**

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1435250&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Fri Jan 18 17:02:28 2013
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ReplicationStateImpl is responsible for maintaining the replication state
+ * znode.
+ */
+public class ReplicationStateImpl implements ReplicationStateInterface {
+
+  private ReplicationStateTracker stateTracker;
+  private final String stateZnode;
+  private final ZooKeeperWatcher zookeeper;
+  private final Abortable abortable;
+  private final AtomicBoolean replicating;
+
+  private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
+
+  public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
+      final Abortable abortable, final AtomicBoolean replicating) {
+    this.zookeeper = zk;
+    this.stateZnode = stateZnode;
+    this.abortable = abortable;
+    this.replicating = replicating;
+
+    // Set a tracker on replicationStateNode
+    this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
+        this.abortable);
+    stateTracker.start();
+    readReplicationStateZnode();
+  }
+
+  public boolean getState() throws KeeperException {
+    return getReplication();
+  }
+
+  public void setState(boolean newState) throws KeeperException {
+    setReplicating(newState);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (stateTracker != null) stateTracker.stop();
+  }
+
+  /**
+   * @param bytes
+   * @return True if the passed in <code>bytes</code> are those of a pb
+   *         serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+        .newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * Set the new replication state for this cluster
+   * @param newState
+   */
+  private void setReplicating(boolean newState) throws KeeperException {
+    ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
+    byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
+        : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+    ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
+  }
+
+  /**
+   * Get the replication status of this cluster. If the state znode doesn't
+   * exist it will also create it and set it true.
+   * @return returns true when it's enabled, else false
+   * @throws KeeperException
+   */
+  private boolean getReplication() throws KeeperException {
+    byte[] data = this.stateTracker.getData(false);
+    if (data == null || data.length == 0) {
+      setReplicating(true);
+      return true;
+    }
+    try {
+      return isStateEnabled(data);
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void readReplicationStateZnode() {
+    try {
+      this.replicating.set(getReplication());
+      LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
+    }
+  }
+
+  /**
+   * Tracker for status of the replication
+   */
+  private class ReplicationStateTracker extends ZooKeeperNodeTracker {
+    public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
+      super(watcher, stateZnode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        readReplicationStateZnode();
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java?rev=1435250&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java Fri Jan 18 17:02:28 2013
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for getting and setting the replication state of a
+ * cluster. This state is used to indicate whether replication is enabled or
+ * disabled on a cluster.
+ */
+public interface ReplicationStateInterface extends Closeable {
+	
+  /**
+   * Get the current state of replication (i.e. ENABLED or DISABLED).
+   * 
+   * @return true if replication is enabled, false otherwise
+   * @throws KeeperException
+   */
+  public boolean getState() throws KeeperException;
+	
+  /**
+   * Set the state of replication.
+   * 
+   * @param newState
+   * @throws KeeperException
+   */
+  public void setState(boolean newState) throws KeeperException;
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1435250&r1=1435249&r2=1435250&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Fri Jan 18 17:02:28 2013
@@ -43,12 +43,10 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -87,7 +85,7 @@ import com.google.protobuf.InvalidProtoc
  * </pre>
  */
 @InterfaceAudience.Private
-public class ReplicationZookeeper implements Closeable{
+public class ReplicationZookeeper implements Closeable {
   private static final Log LOG =
     LogFactory.getLog(ReplicationZookeeper.class);
   // Name of znode we use to lock when failover
@@ -111,24 +109,24 @@ public class ReplicationZookeeper implem
   // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
   private String peerStateNodeName;
   private final Configuration conf;
-  // Is this cluster replicating at the moment?
-  private AtomicBoolean replicating;
   // The key to our own cluster
   private String ourClusterKey;
   // Abortable
   private Abortable abortable;
-  private ReplicationStatusTracker statusTracker;
+  private final ReplicationStateInterface replicationState;
 
   /**
    * ZNode content if enabled state.
    */
   // Public so it can be seen by test code.
-  public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+  public static final byte[] ENABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
 
   /**
    * ZNode content if disabled state.
    */
-  static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+  static final byte[] DISABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
 
   /**
    * Constructor used by clients of replication (like master and HBase clients)
@@ -140,8 +138,9 @@ public class ReplicationZookeeper implem
       final ZooKeeperWatcher zk) throws KeeperException {
     this.conf = conf;
     this.zookeeper = zk;
-    this.replicating = new AtomicBoolean();
     setZNodes(abortable);
+    this.replicationState =
+        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
   }
 
   /**
@@ -157,9 +156,10 @@ public class ReplicationZookeeper implem
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
-    this.replicating = replicating;
     setZNodes(server);
 
+    this.replicationState =
+        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
     this.peerClusters = new HashMap<String, ReplicationPeer>();
     ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
@@ -180,11 +180,6 @@ public class ReplicationZookeeper implem
     ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
     ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
-
-    // Set a tracker on replicationStateNodeNode
-    this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
-    statusTracker.start();
-    readReplicationStateZnode();
   }
 
   private void connectExistingPeers() throws IOException, KeeperException {
@@ -366,18 +361,6 @@ public class ReplicationZookeeper implem
   }
 
   /**
-   * Set the new replication state for this cluster
-   * @param newState
-   */
-  public void setReplicating(boolean newState) throws KeeperException {
-    ZKUtil.createWithParents(this.zookeeper,
-        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
-    byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
-    ZKUtil.setData(this.zookeeper,
-      ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
-  }
-
-  /**
    * Remove the peer from zookeeper. which will trigger the watchers on every
    * region server and close their sources
    * @param id
@@ -641,40 +624,27 @@ public class ReplicationZookeeper implem
     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
   }
 
-  /**
-   * This reads the state znode for replication and sets the atomic boolean
-   */
-  private void readReplicationStateZnode() {
-    try {
-      this.replicating.set(getReplication());
-      LOG.info("Replication is now " + (this.replicating.get()?
-        "started" : "stopped"));
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
-    }
+  private String getRepStateNode() {
+    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
   }
 
   /**
-   * Get the replication status of this cluster. If the state znode doesn't
-   * exist it will also create it and set it true.
+   * Get the replication status of this cluster. If the state znode doesn't exist it will also
+   * create it and set it true.
    * @return returns true when it's enabled, else false
    * @throws KeeperException
    */
   public boolean getReplication() throws KeeperException {
-    byte [] data = this.statusTracker.getData(false);
-    if (data == null || data.length == 0) {
-      setReplicating(true);
-      return true;
-    }
-    try {
-      return isPeerEnabled(data);
-    } catch (DeserializationException e) {
-      throw ZKUtil.convert(e);
-    }
+    return this.replicationState.getState();
   }
 
-  private String getRepStateNode() {
-    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+  /**
+   * Set the new replication state for this cluster
+   * @param newState
+   * @throws KeeperException
+   */
+  public void setReplication(boolean newState) throws KeeperException {
+    this.replicationState.setState(newState);
   }
 
   /**
@@ -1055,8 +1025,7 @@ public class ReplicationZookeeper implem
 
   @Override
   public void close() throws IOException {
-    if (statusTracker != null)
-      statusTracker.stop();
+    if (replicationState != null) replicationState.close();
   }
 
   /**
@@ -1086,26 +1055,8 @@ public class ReplicationZookeeper implem
    *         serialized ENABLED state.
    * @throws DeserializationException
    */
-  static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
+  static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
   }
-
-  /**
-   * Tracker for status of the replication
-   */
-  public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
-    public ReplicationStatusTracker(ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, getRepStateNode(), abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        readReplicationStateZnode();
-      }
-    }
-  }
 }