You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/18 01:42:43 UTC

git commit: ACCUMULO-2918 Pay the penalty of embedding ZK hierarchy manipulation to avoid creating an upgrade path.

Repository: accumulo
Updated Branches:
  refs/heads/master d2dc998c7 -> 672801c15


ACCUMULO-2918 Pay the penalty of embedding ZK hierarchy manipulation to avoid creating an upgrade path.

In a fresh 1.7.0 instance, this isn't an issue because {{accumulo init}} will create the necessary
parent nodes in ZooKeeper. However, for upgrades from 1.6 or 1.7 installs that were created
before the replication code was merged, the tserver will error out because the parent ZK nodes
do not exist (and ZK will not automatically create them).

While it's not ideal to embed this logic in the server processes, it's the better alternative
to create an upgrade path, because one does not yet exist.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/672801c1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/672801c1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/672801c1

Branch: refs/heads/master
Commit: 672801c154362a1aa870ae52439e5f9b81f42c04
Parents: d2dc998
Author: Josh Elser <el...@apache.org>
Authored: Tue Jun 17 16:25:09 2014 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Tue Jun 17 16:25:09 2014 -0700

----------------------------------------------------------------------
 .../replication/ZooKeeperInitialization.java    | 46 +++++++++++
 .../ZooKeeperInitializationTest.java            | 70 ++++++++++++++++
 .../java/org/apache/accumulo/master/Master.java |  7 +-
 .../monitor/servlets/ReplicationServlet.java    | 87 +++++++++++---------
 .../apache/accumulo/tserver/TabletServer.java   | 10 +++
 5 files changed, 178 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java
new file mode 100644
index 0000000..9e0a5a6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * We don't want to introduce an upgrade path to 1.7 only for some new nodes within ZooKeeper
+ * <p>
+ * We can take the penalty of embedding this logic into the server processes, but alleviate
+ * users/developers from having to worry about the zookeeper state.
+ */
+public class ZooKeeperInitialization {
+  /**
+   * Ensure that the full path to ZooKeeper nodes that will be used exist
+   * @param zooReaderWriter
+   * @param zRoot
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public static void ensureZooKeeperInitialized(final ZooReaderWriter zooReaderWriter, final String zRoot) throws KeeperException, InterruptedException {
+    if (!zooReaderWriter.exists(zRoot + ReplicationConstants.ZOO_TSERVERS, null)) {
+      zooReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_TSERVERS);
+    }
+
+    if (!zooReaderWriter.exists(zRoot + ReplicationConstants.ZOO_WORK_QUEUE, null)) {
+      zooReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_WORK_QUEUE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java
new file mode 100644
index 0000000..25e7bc8
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class ZooKeeperInitializationTest {
+
+  @Test
+  public void parentNodesAreCreatedWhenMissing() throws Exception {
+    ZooReaderWriter zReaderWriter = createMock(ZooReaderWriter.class);
+    String zRoot = "/accumulo";
+
+    expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_TSERVERS, null)).andReturn(false).once();
+    zReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_TSERVERS);
+    expectLastCall().once();
+
+    expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_WORK_QUEUE, null)).andReturn(false).once();
+    zReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_WORK_QUEUE);
+    expectLastCall().once();
+
+    replay(zReaderWriter);
+
+    ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zRoot);
+
+    verify(zReaderWriter);
+  }
+
+  @Test
+  public void parentNodesAreNotRecreatedWhenAlreadyExist() throws Exception {
+    ZooReaderWriter zReaderWriter = createMock(ZooReaderWriter.class);
+    String zRoot = "/accumulo";
+
+    expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_TSERVERS, null)).andReturn(true).once();
+
+    expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_WORK_QUEUE, null)).andReturn(true).once();
+
+    replay(zReaderWriter);
+
+    ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zRoot);
+
+    verify(zReaderWriter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index a722008..820db5d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -106,6 +106,7 @@ import org.apache.accumulo.server.master.state.TabletMigration;
 import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.master.state.ZooStore;
 import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.replication.ZooKeeperInitialization;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.security.SystemCredentials;
@@ -935,7 +936,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
 
     tserverSet.startListeningForTabletServerChanges();
 
-    ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
+    ZooReaderWriter zReaderWriter = ZooReaderWriter.getInstance();
+
+    zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
         nextEvent.event("Noticed recovery changes", event.getType());
@@ -981,6 +984,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
       throw new IOException(e);
     }
 
+    ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
+
     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this)));
     ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master",
         "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 2fcd67d..563fb6d 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -57,6 +57,7 @@ import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -210,56 +211,60 @@ public class ReplicationServlet extends BasicServlet {
 
     DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst));
 
-    for (String queueKey : workQueue.getWorkQueued()) {
-      Entry<String,ReplicationTarget> queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey);
-      String filename = queueKeyPair.getKey();
-      ReplicationTarget target = queueKeyPair.getValue();
-
-      byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
-
-      // We could try to grep over the table, but without knowing the full file path, we
-      // can't find the status quickly
-      String status = "Unknown";
-      String path = null;
-      if (null != data) {
-        path = new String(data);
-        Scanner s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
-        s.setRange(Range.exact(path));
-        s.fetchColumn(WorkSection.NAME, target.toText());
+    try {
+      for (String queueKey : workQueue.getWorkQueued()) {
+        Entry<String,ReplicationTarget> queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey);
+        String filename = queueKeyPair.getKey();
+        ReplicationTarget target = queueKeyPair.getValue();
   
-        // Fetch the work entry for this item
-        Entry<Key,Value> kv = null;
-        try {
-          kv = Iterables.getOnlyElement(s);
-        } catch (NoSuchElementException e) {
-         log.trace("Could not find status of {} replicating to {}", filename, target);
-         status = "Unknown";
-        } finally {
-          s.close();
-        }
+        byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
   
-        // If we found the work entry for it, try to compute some progress
-        if (null != kv) {
+        // We could try to grep over the table, but without knowing the full file path, we
+        // can't find the status quickly
+        String status = "Unknown";
+        String path = null;
+        if (null != data) {
+          path = new String(data);
+          Scanner s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
+          s.setRange(Range.exact(path));
+          s.fetchColumn(WorkSection.NAME, target.toText());
+    
+          // Fetch the work entry for this item
+          Entry<Key,Value> kv = null;
           try {
-            Status stat = Status.parseFrom(kv.getValue().get());
-            if (StatusUtil.isFullyReplicated(stat)) {
-              status = "Finished";
-            } else {
-              if (stat.getInfiniteEnd()) {
-                status = stat.getBegin() + "/&infin; records";
+            kv = Iterables.getOnlyElement(s);
+          } catch (NoSuchElementException e) {
+           log.trace("Could not find status of {} replicating to {}", filename, target);
+           status = "Unknown";
+          } finally {
+            s.close();
+          }
+    
+          // If we found the work entry for it, try to compute some progress
+          if (null != kv) {
+            try {
+              Status stat = Status.parseFrom(kv.getValue().get());
+              if (StatusUtil.isFullyReplicated(stat)) {
+                status = "Finished";
               } else {
-                status = stat.getBegin() + "/" + stat.getEnd() + " records";
+                if (stat.getInfiniteEnd()) {
+                  status = stat.getBegin() + "/&infin; records";
+                } else {
+                  status = stat.getBegin() + "/" + stat.getEnd() + " records";
+                }
               }
+            } catch (InvalidProtocolBufferException e) {
+              log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+              status = "Unknown";
             }
-          } catch (InvalidProtocolBufferException e) {
-            log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
-            status = "Unknown";
           }
         }
+  
+        // Add a row in the table
+        replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
       }
-
-      // Add a row in the table
-      replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Could not calculate replication in progress", e);
     }
 
     replicationInProgress.generate(req, sb);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index f67c51f..9e8af0a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -160,6 +160,7 @@ import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.replication.ZooKeeperInitialization;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.security.SystemCredentials;
@@ -2340,6 +2341,15 @@ public class TabletServer implements Runnable {
   public void run() {
     SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 
+    // To make things easier on users/devs, and to avoid creating an upgrade path to 1.7
+    // We can just make the zookeeper paths before we try to use.
+    try {
+      ZooKeeperInitialization.ensureZooKeeperInitialized(ZooReaderWriter.getInstance(), ZooUtil.getRoot(getInstance()));
+    } catch (KeeperException | InterruptedException e) {
+      log.error("Could not ensure that ZooKeeper is properly initialized", e);
+      throw new RuntimeException(e);
+    }
+
     try {
       clientAddress = startTabletClientService();
     } catch (UnknownHostException e1) {