You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:04 UTC

[26/50] [abbrv] git commit: ACCUMULO-2212 Add ZooReaderWriterFactory

ACCUMULO-2212 Add ZooReaderWriterFactory

The o.a.a.server.zookeeper.ZooReaderWriter class is mostly copied into a new
ZooReaderWriterFactory class. Users of the factory can work directly with FATE
ZooReaderWriter instances, instead of those of the server type, which is no longer
needed. The logic for building instances from site configuration data is in the
factory.

As part of this change, the invocation handler used to retry ZK calls on connection
loss is refactored into a new RetryingInvocationHandler.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/484491d2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/484491d2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/484491d2

Branch: refs/heads/ACCUMULO-378
Commit: 484491d21abf84240912209ae16e33639cbfa595
Parents: cdcc1d4
Author: Bill Havanki <bh...@cloudera.com>
Authored: Wed Apr 9 16:02:41 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Mon May 5 10:40:06 2014 -0400

----------------------------------------------------------------------
 .../zookeeper/RetryingInvocationHandler.java    | 63 ++++++++++++++
 .../fate/zookeeper/ZooReaderWriter.java         | 24 +-----
 .../RetryingInvocationHandlerTest.java          | 87 ++++++++++++++++++++
 .../zookeeper/ZooReaderWriterFactory.java       | 86 +++++++++++++++++++
 4 files changed, 238 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/484491d2/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
new file mode 100644
index 0000000..4597036
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * An invocation handler for ZooKeeper reader/writers that retries calls that fail due to connection loss.
+ */
+public class RetryingInvocationHandler implements InvocationHandler {
+  private final IZooReaderWriter zrw;
+
+  /**
+   * Creates a new invocation handler.
+   *
+   * @param zrw
+   *          ZooKeeper reader/writer being handled
+   */
+  public RetryingInvocationHandler(IZooReaderWriter zrw) {
+    this.zrw = zrw;
+  }
+
+  private static final long INITIAL_RETRY_TIME = 250L;
+  private static final long RETRY_INCREMENT = 250L;
+  private static final long MAXIMUM_RETRY_TIME = 5000L;
+
+  @Override
+  public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+    long retryTime = INITIAL_RETRY_TIME;
+    while (true) {
+      try {
+        return method.invoke(zrw, args);
+      } catch (InvocationTargetException e) {
+        if (e.getCause() instanceof KeeperException.ConnectionLossException) {
+          Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + String.format("%.2f secs", retryTime / 1000.0), e.getCause());
+          UtilWaitThread.sleep(retryTime);
+          retryTime = Math.min(MAXIMUM_RETRY_TIME, retryTime + RETRY_INCREMENT);
+        } else {
+          throw e.getCause();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/484491d2/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
index 24c1d85..16f7833 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
@@ -154,28 +154,8 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
   public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
     
     if (retryingInstance == null) {
-      final IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, scheme, auth);
-      
-      InvocationHandler ih = new InvocationHandler() {
-        @Override
-        public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
-          long retryTime = 250;
-          while (true) {
-            try {
-              return method.invoke(inst, args);
-            } catch (InvocationTargetException e) {
-              if (e.getCause() instanceof KeeperException.ConnectionLossException) {
-                Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + String.format("%.2f secs", retryTime / 1000.0), e.getCause());
-                UtilWaitThread.sleep(retryTime);
-                retryTime = Math.min(5000, retryTime + 250);
-              } else {
-                throw e.getCause();
-              }
-            }
-          }
-        }
-      };
-      
+      IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, scheme, auth);
+      InvocationHandler ih = new RetryingInvocationHandler(inst);
       retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
     }
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/484491d2/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
new file mode 100644
index 0000000..0613a1f
--- /dev/null
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.lang.reflect.Method;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.easymock.EasyMock.aryEq;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class RetryingInvocationHandlerTest {
+  private static final String PATH = "/path/to/somewhere";
+  private static final byte[] DATA = {(byte) 1, (byte) 2};
+  private static final Object[] ARGS = {PATH, DATA};
+  private static final String RV = "OK";
+
+  private static Method putMethod;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    putMethod = IZooReaderWriter.class.getMethod("putEphemeralData", String.class, byte[].class);
+  }
+
+  private IZooReaderWriter zrw;
+  private RetryingInvocationHandler ih;
+
+  @Before
+  public void setUp() throws Exception {
+    zrw = createMock(IZooReaderWriter.class);
+    ih = new RetryingInvocationHandler(zrw);
+  }
+
+  @Test
+  public void testInvokeSuccessful() throws Throwable {
+    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andReturn(RV);
+    replay(zrw);
+    Object rv = ih.invoke(null, putMethod, ARGS);
+    verify(zrw);
+    assertEquals(RV, rv);
+  }
+
+  @Test
+  public void testInvokeRetrySuccessful() throws Throwable {
+    ConnectionLossException e = createMock(ConnectionLossException.class);
+    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
+    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
+    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andReturn(RV);
+    replay(zrw);
+    Object rv = ih.invoke(null, putMethod, ARGS);
+    verify(zrw);
+    assertEquals(RV, rv);
+  }
+
+  @Test(expected = InterruptedException.class)
+  public void testInvokeRetryFailure() throws Throwable {
+    ConnectionLossException e = createMock(ConnectionLossException.class);
+    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
+    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(new InterruptedException());
+    replay(zrw);
+    try {
+      ih.invoke(null, putMethod, ARGS);
+    } finally {
+      verify(zrw);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/484491d2/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
new file mode 100644
index 0000000..3c8d143
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.RetryingInvocationHandler;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+
+/**
+ * A factory for {@link ZooReaderWriter} objects.
+ */
+public class ZooReaderWriterFactory {
+  private static final String SCHEME = "digest";
+  private static final String USER = "accumulo";
+  private static IZooReaderWriter instance = null;
+  private static IZooReaderWriter retryingInstance = null;
+
+  /**
+   * Gets a new reader/writer.
+   *
+   * @param string
+   *          ZooKeeper connection string
+   * @param timeInMillis
+   *          session timeout in milliseconds
+   * @param secret
+   *          instance secret
+   * @return reader/writer
+   */
+  public IZooReaderWriter getZooReaderWriter(String string, int timeInMillis, String secret) {
+    return new ZooReaderWriter(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(Constants.UTF8));
+  }
+
+  /**
+   * Gets a reader/writer, retrieving ZooKeeper information from the site configuration. The same instance may be returned for multiple calls.
+   *
+   * @return reader/writer
+   */
+  public IZooReaderWriter getInstance() {
+    synchronized (ZooReaderWriterFactory.class) {
+      if (instance == null) {
+        AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
+        instance = getZooReaderWriter(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+            conf.get(Property.INSTANCE_SECRET));
+      }
+      return instance;
+    }
+  }
+
+  /**
+   * Gets a reader/writer, retrieving ZooKeeper information from the site configuration, and that retries on connection loss. The same instance may be returned
+   * for multiple calls.
+   *
+   * @return retrying reader/writer
+   */
+  public IZooReaderWriter getRetryingInstance() {
+    synchronized (ZooReaderWriterFactory.class) {
+      if (retryingInstance == null) {
+        IZooReaderWriter inst = getInstance();
+        InvocationHandler ih = new RetryingInvocationHandler(inst);
+        retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(IZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
+      }
+      return retryingInstance;
+    }
+  }
+}