You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/09/29 20:50:27 UTC

[11/12] accumulo git commit: ACCUMULO-4012 possible infinite loop when the base transaction is removed before scanning it

ACCUMULO-4012 possible infinite loop when the base transaction is removed before scanning it


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d301f4e2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d301f4e2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d301f4e2

Branch: refs/heads/1.7
Commit: d301f4e29361522dab8a8e036a22553d62daa0a6
Parents: 7f50f60
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue Sep 29 14:41:30 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue Sep 29 14:41:30 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/fate/ZooStore.java | 27 ++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d301f4e2/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java b/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
index 72220f8..6f5ea70 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.fate;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -40,6 +42,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 //TODO use zoocache? - ACCUMULO-1297
 //TODO handle zookeeper being down gracefully - ACCUMULO-1297
@@ -47,6 +51,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
 
 public class ZooStore<T> implements TStore<T> {
 
+  private static final Logger log = LoggerFactory.getLogger(ZooStore.class);
   private String path;
   private IZooReaderWriter zk;
   private String lastReserved = "";
@@ -194,6 +199,7 @@ public class ZooStore<T> implements TStore<T> {
     }
   }
 
+  @Override
   public void reserve(long tid) {
     synchronized (this) {
       reservationsWaiting++;
@@ -249,26 +255,37 @@ public class ZooStore<T> implements TStore<T> {
     }
   }
 
+  private static final int RETRIES = 10;
+
   @SuppressWarnings("unchecked")
   @Override
   public Repo<T> top(long tid) {
     verifyReserved(tid);
 
-    while (true) {
+    for (int i = 0; i < RETRIES; i++) {
+      String txpath = getTXPath(tid);
       try {
-        String txpath = getTXPath(tid);
-        String top = findTop(txpath);
-        if (top == null)
-          return null;
+        String top;
+        try {
+          top = findTop(txpath);
+          if (top == null) {
+            return null;
+          }
+        } catch (KeeperException.NoNodeException ex) {
+          throw new RuntimeException(ex);
+        }
 
         byte[] ser = zk.getData(txpath + "/" + top, null);
         return (Repo<T>) deserialize(ser);
       } catch (KeeperException.NoNodeException ex) {
+        log.debug("zookeeper error reading " + txpath + ": " + ex.toString(), ex);
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
         continue;
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
     }
+    return null;
   }
 
   private String findTop(String txpath) throws KeeperException, InterruptedException {