You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2014/10/20 23:38:05 UTC

svn commit: r1633234 - in /hive/branches/branch-0.14: ./ hcatalog/webhcat/svr/ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/

Author: ekoifman
Date: Mon Oct 20 21:38:05 2014
New Revision: 1633234

URL: http://svn.apache.org/r1633234
Log:
HIVE-8387 add retry logic to ZooKeeperStorage in WebHCat

Modified:
    hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml
    hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
    hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
    hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java
    hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
    hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
    hive/branches/branch-0.14/pom.xml

Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml Mon Oct 20 21:38:05 2014
@@ -38,7 +38,7 @@
   </properties>
 
   <dependencies>
-    <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+    <!-- dependencies are always listed in sorted order by groupId, artifactId -->
     <!-- intra-project -->
     <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
@@ -67,6 +67,14 @@
       <artifactId>commons-exec</artifactId>
       <version>${commons-exec.version}</version>
     </dependency>
+
+      
+      <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+       <version>${curator.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
@@ -190,6 +198,37 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>include-curator</id>
+            <!-- WebHCat uses Curator library to work with ZooKeeper.  Thus it must be available
+            on a random node in the cluster where LaunchMapper runs to actually execute the job.
+            The simplest way is to include it in webhcat jar which is shipped to target node since
+            it contains LaunchMapper.java.-->
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <minimizeJar>true</minimizeJar>
+              <artifactSet>
+                <includes>
+                  <include>org.apache.curator</include>
+                </includes>
+              </artifactSet>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.curator</pattern>
+                  <shadedPattern>webhcat.org.apache.curator</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java Mon Oct 20 21:38:05 2014
@@ -25,9 +25,7 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -116,32 +114,6 @@ public class HDFSStorage implements Temp
   }
 
   @Override
-  public Map<String, String> getFields(Type type, String id) {
-    HashMap<String, String> map = new HashMap<String, String>();
-    BufferedReader in = null;
-    Path p = new Path(getPath(type) + "/" + id);
-    try {
-      for (FileStatus status : fs.listStatus(p)) {
-        in = new BufferedReader(new InputStreamReader(fs.open(status.getPath())));
-        String line = null;
-        String val = "";
-        while ((line = in.readLine()) != null) {
-          if (!val.equals("")) {
-            val += "\n";
-          }
-          val += line;
-        }
-        map.put(status.getPath().getName(), val);
-      }
-    } catch (IOException e) {
-      LOG.trace("Couldn't find " + p);
-    } finally {
-      close(in);
-    }
-    return map;
-  }
-
-  @Override
   public boolean delete(Type type, String id) throws NotFoundException {
     Path p = new Path(getPath(type) + "/" + id);
     try {
@@ -153,14 +125,6 @@ public class HDFSStorage implements Temp
     return false;
   }
 
-  @Override
-  public List<String> getAll() {
-    ArrayList<String> allNodes = new ArrayList<String>();
-    for (Type type : Type.values()) {
-      allNodes.addAll(getAllForType(type));
-    }
-    return allNodes;
-  }
 
   @Override
   public List<String> getAllForType(Type type) {
@@ -177,40 +141,6 @@ public class HDFSStorage implements Temp
   }
 
   @Override
-  public List<String> getAllForKey(String key, String value) {
-    ArrayList<String> allNodes = new ArrayList<String>();
-    try {
-      for (Type type : Type.values()) {
-        allNodes.addAll(getAllForTypeAndKey(type, key, value));
-      }
-    } catch (Exception e) {
-      LOG.trace("Couldn't find children for key " + key + ": " +
-        e.getMessage());
-    }
-    return allNodes;
-  }
-
-  @Override
-  public List<String> getAllForTypeAndKey(Type type, String key, String value) {
-    ArrayList<String> allNodes = new ArrayList<String>();
-    HashMap<String, String> map = new HashMap<String, String>();
-    try {
-      for (FileStatus status :
-        fs.listStatus(new Path(getPath(type)))) {
-        map = (HashMap<String, String>)
-          getFields(type, status.getPath().getName());
-        if (map.get(key).equals(value)) {
-          allNodes.add(status.getPath().getName());
-        }
-      }
-    } catch (Exception e) {
-      LOG.trace("Couldn't find children for key " + key + ": " +
-        e.getMessage());
-    }
-    return allNodes;
-  }
-
-  @Override
   public void openStorage(Configuration config) throws IOException {
     storage_root = config.get(TempletonStorage.STORAGE_ROOT);
     if (fs == null) {

Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java Mon Oct 20 21:38:05 2014
@@ -24,19 +24,30 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
 
+/*
+ * The general idea here is to create
+ * /created/1
+ * /created/2
+ * /created/3 ....
+ * for each job submitted.  The node number is generated by ZK (PERSISTENT_SEQUENTIAL) and the 
+ * payload is the JobId. Basically this keeps track of the order in which jobs were submitted,
+ * and ZooKeeperCleanup uses this to purge old job info.
+ * Since the /jobs/<id> node has a create/update timestamp 
+ * (http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#sc_zkStatStructure) this whole
+ * thing can be removed.
+*/
 public class JobStateTracker {
   // The path to the tracking root
   private String job_trackingroot = null;
 
   // The zookeeper connection to use
-  private ZooKeeper zk;
+  private CuratorFramework zk;
 
   // The id of the tracking node -- must be a SEQUENTIAL node
   private String trackingnode;
@@ -51,7 +62,7 @@ public class JobStateTracker {
    * Constructor for a new node -- takes the jobid of an existing job
    *
    */
-  public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker,
+  public JobStateTracker(String node, CuratorFramework zk, boolean nodeIsTracker,
                String job_trackingpath) {
     this.zk = zk;
     if (nodeIsTracker) {
@@ -65,30 +76,25 @@ public class JobStateTracker {
   /**
    * Create the parent znode for this job state.
    */
-  public void create()
-    throws IOException {
-    String[] paths = ZooKeeperStorage.getPaths(job_trackingroot);
-    for (String znode : paths) {
-      try {
-        zk.create(znode, new byte[0],
-          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      } catch (KeeperException.NodeExistsException e) {
-      } catch (Exception e) {
-        throw new IOException("Unable to create parent nodes");
-      }
+  public void create() throws IOException {
+    try {
+      zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+        .withACL(Ids.OPEN_ACL_UNSAFE).forPath(job_trackingroot);
+    } catch (KeeperException.NodeExistsException e) {
+      //root must exist already
+    } catch (Exception e) {
+      throw new IOException("Unable to create parent nodes");
     }
     try {
-      trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(),
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+      trackingnode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
+        .withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeTrackingZnode(), jobid.getBytes());
     } catch (Exception e) {
       throw new IOException("Unable to create " + makeTrackingZnode());
     }
   }
-
-  public void delete()
-    throws IOException {
+  public void delete() throws IOException {
     try {
-      zk.delete(makeTrackingJobZnode(trackingnode), -1);
+      zk.delete().forPath(makeTrackingJobZnode(trackingnode));
     } catch (Exception e) {
       // Might have been deleted already
       LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode));
@@ -101,13 +107,10 @@ public class JobStateTracker {
    */
   public String getJobID() throws IOException {
     try {
-      return new String(zk.getData(makeTrackingJobZnode(trackingnode),
-        false, new Stat()));
-    } catch (KeeperException e) {
+      return new String(zk.getData().forPath(makeTrackingJobZnode(trackingnode)));
+    } catch (Exception e) {
       // It was deleted during the transaction
-      throw new IOException("Node already deleted " + trackingnode);
-    } catch (InterruptedException e) {
-      throw new IOException("Couldn't read node " + trackingnode);
+      throw new IOException("Node already deleted " + trackingnode, e);
     }
   }
 
@@ -129,13 +132,13 @@ public class JobStateTracker {
    * Get the list of tracking jobs.  These can be used to determine which jobs have
    * expired.
    */
-  public static List<String> getTrackingJobs(Configuration conf, ZooKeeper zk)
+  public static List<String> getTrackingJobs(Configuration conf, CuratorFramework zk)
     throws IOException {
     ArrayList<String> jobs = new ArrayList<String>();
     try {
-      for (String myid : zk.getChildren(
+      for (String myid : zk.getChildren().forPath(
         conf.get(TempletonStorage.STORAGE_ROOT)
-          + ZooKeeperStorage.TRACKINGDIR, false)) {
+          + ZooKeeperStorage.TRACKINGDIR)) {
         jobs.add(myid);
       }
     } catch (Exception e) {

Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java Mon Oct 20 21:38:05 2014
@@ -20,7 +20,6 @@ package org.apache.hive.hcatalog.templet
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -45,7 +44,7 @@ import org.apache.hadoop.conf.Configurat
 public interface TempletonStorage {
   // These are the possible types referenced by 'type' below.
   public enum Type {
-    UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD
+    UNKNOWN, JOB, JOBTRACKING
   }
 
   public static final String STORAGE_CLASS    = "templeton.storage.class";
@@ -79,20 +78,6 @@ public interface TempletonStorage {
   public String getField(Type type, String id, String key);
 
   /**
-   * Get all the name/value pairs stored for this id.
-   * Be careful using getFields() -- optimistic locking will mean that
-   * your odds of a conflict are decreased if you read/write one field
-   * at a time.  getFields() is intended for read-only usage.
-   *
-   * If the type is UNKNOWN, search for the id in all types.
-   *
-   * @param type The data type (as listed above)
-   * @param id The String id of this data grouping (jobid, etc.)
-   * @return A Map of key/value pairs found for this type/id.
-   */
-  public Map<String, String> getFields(Type type, String id);
-
-  /**
    * Delete a data grouping (all data for a jobid, all tracking data
    * for a job, etc.).  If the type is UNKNOWN, search for the id
    * in all types.
@@ -105,13 +90,6 @@ public interface TempletonStorage {
   public boolean delete(Type type, String id) throws NotFoundException;
 
   /**
-   * Get the id of each data grouping in the storage system.
-   *
-   * @return An ArrayList<String> of ids.
-   */
-  public List<String> getAll();
-
-  /**
    * Get the id of each data grouping of a given type in the storage
    * system.
    * @param type The data type (as listed above)
@@ -120,26 +98,6 @@ public interface TempletonStorage {
   public List<String> getAllForType(Type type);
 
   /**
-   * Get the id of each data grouping that has the specific key/value
-   * pair.
-   * @param key The name of the field to search for
-   * @param value The value of the field to search for
-   * @return An ArrayList<String> of ids.
-   */
-  public List<String> getAllForKey(String key, String value);
-
-  /**
-   * Get the id of each data grouping of a given type that has the
-   * specific key/value pair.
-   * @param type The data type (as listed above)
-   * @param key The name of the field to search for
-   * @param value The value of the field to search for
-   * @return An ArrayList<String> of ids.
-   */
-  public List<String> getAllForTypeAndKey(Type type, String key,
-                      String value);
-
-  /**
    * For storage methods that require a connection, this is a hint
    * that it's time to open a connection.
    */

Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java Mon Oct 20 21:38:05 2014
@@ -24,8 +24,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Date;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.ZooKeeper;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -89,7 +89,7 @@ public class ZooKeeperCleanup extends Th
    * @throws IOException
    */
   public void run() {
-    ZooKeeper zk = null;
+    CuratorFramework zk = null;
     List<String> nodes = null;
     isRunning = true;
     while (!stop) {
@@ -112,13 +112,7 @@ public class ZooKeeperCleanup extends Th
         } catch (Exception e) {
           LOG.error("Cleanup cycle failed: " + e.getMessage());
         } finally {
-          if (zk != null) {
-            try {
-              zk.close();
-            } catch (InterruptedException e) {
-              // We're trying to exit anyway, just ignore.
-            }
-          }
+          if (zk != null) zk.close();
         }
 
         long sleepMillis = (long) (Math.random() * interval);
@@ -140,7 +134,7 @@ public class ZooKeeperCleanup extends Th
    *
    * @throws IOException
    */
-  public List<String> getChildList(ZooKeeper zk) {
+  public List<String> getChildList(CuratorFramework zk) {
     try {
       List<String> jobs = JobStateTracker.getTrackingJobs(appConf, zk);
       Collections.sort(jobs);
@@ -154,7 +148,7 @@ public class ZooKeeperCleanup extends Th
   /**
    * Check to see if a job is more than maxage old, and delete it if so.
    */
-  public boolean checkAndDelete(String node, ZooKeeper zk) {
+  public boolean checkAndDelete(String node, CuratorFramework zk) {
     JobState state = null;
     try {
       JobStateTracker tracker = new JobStateTracker(node, zk, true,
@@ -167,8 +161,11 @@ public class ZooKeeperCleanup extends Th
       // an error in creation, and we want to delete it anyway.
       long then = 0;
       if (state.getCreated() != null) {
+        //this is set in ZooKeeperStorage.create()
         then = state.getCreated();
       }
+      //todo: this should check that the job actually completed and likely use completion time
+      //which is not tracked directly but available on /jobs/<id> node via "mtime" in Stat
       if (now - then > maxage) {
         LOG.info("Deleting " + tracker.getJobID());
         state.delete();
@@ -177,7 +174,7 @@ public class ZooKeeperCleanup extends Th
       }
       return false;
     } catch (Exception e) {
-      LOG.info("checkAndDelete failed for " + node);
+      LOG.info("checkAndDelete failed for " + node + " due to: " + e.getMessage());
       // We don't throw a new exception for this -- just keep going with the
       // next one.
       return true;

Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java Mon Oct 20 21:38:05 2014
@@ -19,21 +19,18 @@
 package org.apache.hive.hcatalog.templeton.tool;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
 
 /**
  * A storage implementation based on storing everything in ZooKeeper.
@@ -60,29 +57,29 @@ public class ZooKeeperStorage implements
 
   private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
 
-  private ZooKeeper zk;
+  private CuratorFramework zk;
 
   /**
    * Open a ZooKeeper connection for the JobState.
    */
-  public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
+  public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
     throws IOException {
-    return new ZooKeeper(zkHosts,
-      zkSessionTimeout,
-      new Watcher() {
-        @Override
-        synchronized public void process(WatchedEvent event) {
-        }
-      });
+    //do we need to add a connection status listener?  What will that do?
+    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
+      CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
+    zk.start();
+    return zk;
   }
 
   /**
    * Open a ZooKeeper connection for the JobState.
    */
-  public static ZooKeeper zkOpen(Configuration conf)
-    throws IOException {
+  public static CuratorFramework zkOpen(Configuration conf) throws IOException {
+    /*the silly looking call to Builder below is to get the default value of session timeout
+    from Curator which itself exposes it as system property*/
     return zkOpen(conf.get(ZK_HOSTS),
-      conf.getInt(ZK_SESSION_TIMEOUT, 30000));
+      conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
   }
 
   public ZooKeeperStorage() {
@@ -93,15 +90,9 @@ public class ZooKeeperStorage implements
   /**
    * Close this ZK connection.
    */
-  public void close()
-    throws IOException {
+  public void close() throws IOException {
     if (zk != null) {
-      try {
-        zk.close();
-        zk = null;
-      } catch (InterruptedException e) {
-        throw new IOException("Closing ZooKeeper connection", e);
-      }
+      zk.close();
     }
   }
 
@@ -118,48 +109,54 @@ public class ZooKeeperStorage implements
    */
   public void create(Type type, String id)
     throws IOException {
+    boolean wasCreated = false;
     try {
-      String[] paths = getPaths(makeZnode(type, id));
-      boolean wasCreated = false;
-      for (String znode : paths) {
-        try {
-          zk.create(znode, new byte[0],
-            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-          wasCreated = true;
-        } catch (KeeperException.NodeExistsException e) {
+      zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeZnode(type, id));
+      wasCreated = true;
+    }
+    catch(KeeperException.NodeExistsException ex) {
+      //we just created top level node for this jobId
+    }
+    catch(Exception ex) {
+      throw new IOException("Error creating " + makeZnode(type, id), ex);
+    }
+    if(wasCreated) {
+      try {
+        // Really not sure if this should go here.  Will have
+        // to see how the storage mechanism evolves.
+        if (type.equals(Type.JOB)) {
+          JobStateTracker jt = new JobStateTracker(id, zk, false, job_trackingpath);
+          jt.create();
         }
-      }
-      if (wasCreated) {
+      } catch (Exception e) {
+        LOG.error("Error tracking (jobId=" + id + "): " + e.getMessage());
+        // If we couldn't create the tracker node, don't create the main node.
         try {
-          // Really not sure if this should go here.  Will have
-          // to see how the storage mechanism evolves.
-          if (type.equals(Type.JOB)) {
-            JobStateTracker jt = new JobStateTracker(id, zk, false,
-              job_trackingpath);
-            jt.create();
-          }
-        } catch (Exception e) {
-          LOG.warn("Error tracking: " + e.getMessage());
-          // If we couldn't create the tracker node, don't
-          // create the main node.
-          zk.delete(makeZnode(type, id), -1);
+          zk.delete().forPath(makeZnode(type, id));//default version is -1
+        }
+        catch(Exception ex) {
+          //EK: it's not obvious that this is the right logic, if we don't record the 'callback'
+          //for example and never notify the client of job completion
+          throw new IOException("Failed to delete " + makeZnode(type, id) + ":" + ex);
         }
       }
-      if (zk.exists(makeZnode(type, id), false) == null)
+    }
+    try {
+      if (zk.checkExists().forPath(makeZnode(type, id)) == null) {
         throw new IOException("Unable to create " + makeZnode(type, id));
-      if (wasCreated) {
-        try {
-          saveField(type, id, "created",
-            Long.toString(System.currentTimeMillis()));
-        } catch (NotFoundException nfe) {
-          // Wow, something's really wrong.
-          throw new IOException("Couldn't write to node " + id, nfe);
-        }
       }
-    } catch (KeeperException e) {
-      throw new IOException("Creating " + id, e);
-    } catch (InterruptedException e) {
-      throw new IOException("Creating " + id, e);
+    }
+    catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    if (wasCreated) {
+      try {
+        saveField(type, id, "created",
+          Long.toString(System.currentTimeMillis()));
+      } catch (NotFoundException nfe) {
+        // Wow, something's really wrong.
+        throw new IOException("Couldn't write to node " + id, nfe);
+      }
     }
   }
 
@@ -198,25 +195,14 @@ public class ZooKeeperStorage implements
 
   /**
    * A helper method that sets a field value.
-   * @param type
-   * @param id
-   * @param name
-   * @param val
-   * @throws KeeperException
-   * @throws UnsupportedEncodingException
-   * @throws InterruptedException
+   * @throws java.lang.Exception
    */
-  private void setFieldData(Type type, String id, String name, String val)
-    throws KeeperException, UnsupportedEncodingException, InterruptedException {
+  private void setFieldData(Type type, String id, String name, String val) throws Exception {
     try {
-      zk.create(makeFieldZnode(type, id, name),
-        val.getBytes(ENCODING),
-        Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+      zk.create().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE)
+        .forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING));
     } catch (KeeperException.NodeExistsException e) {
-      zk.setData(makeFieldZnode(type, id, name),
-        val.getBytes(ENCODING),
-        -1);
+      zk.setData().forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING));
     }
   }
 
@@ -251,7 +237,7 @@ public class ZooKeeperStorage implements
   @Override
   public String getField(Type type, String id, String key) {
     try {
-      byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null);
+      byte[] b = zk.getData().forPath(makeFieldZnode(type, id, key));
       return new String(b, ENCODING);
     } catch (Exception e) {
       return null;
@@ -259,26 +245,12 @@ public class ZooKeeperStorage implements
   }
 
   @Override
-  public Map<String, String> getFields(Type type, String id) {
-    HashMap<String, String> map = new HashMap<String, String>();
-    try {
-      for (String node : zk.getChildren(makeZnode(type, id), false)) {
-        byte[] b = zk.getData(makeFieldZnode(type, id, node),
-          false, null);
-        map.put(node, new String(b, ENCODING));
-      }
-    } catch (Exception e) {
-      return map;
-    }
-    return map;
-  }
-
-  @Override
   public boolean delete(Type type, String id) throws NotFoundException {
     try {
-      for (String child : zk.getChildren(makeZnode(type, id), false)) {
+      
+      for (String child : zk.getChildren().forPath(makeZnode(type, id))) {
         try {
-          zk.delete(makeFieldZnode(type, id, child), -1);
+          zk.delete().forPath(makeFieldZnode(type, id, child));
         } catch (Exception e) {
           // Other nodes may be trying to delete this at the same time,
           // so just log errors and skip them.
@@ -287,7 +259,7 @@ public class ZooKeeperStorage implements
         }
       }
       try {
-        zk.delete(makeZnode(type, id), -1);
+        zk.delete().forPath(makeZnode(type, id));
       } catch (Exception e) {
         // Same thing -- might be deleted by other nodes, so just go on.
         throw new NotFoundException("Couldn't delete " +
@@ -302,58 +274,15 @@ public class ZooKeeperStorage implements
   }
 
   @Override
-  public List<String> getAll() {
-    ArrayList<String> allNodes = new ArrayList<String>();
-    for (Type type : Type.values()) {
-      allNodes.addAll(getAllForType(type));
-    }
-    return allNodes;
-  }
-
-  @Override
   public List<String> getAllForType(Type type) {
     try {
-      return zk.getChildren(getPath(type), false);
+      return zk.getChildren().forPath(getPath(type));
     } catch (Exception e) {
       return new ArrayList<String>();
     }
   }
 
   @Override
-  public List<String> getAllForKey(String key, String value) {
-    ArrayList<String> allNodes = new ArrayList<String>();
-    try {
-      for (Type type : Type.values()) {
-        allNodes.addAll(getAllForTypeAndKey(type, key, value));
-      }
-    } catch (Exception e) {
-      LOG.info("Couldn't find children.");
-    }
-    return allNodes;
-  }
-
-  @Override
-  public List<String> getAllForTypeAndKey(Type type, String key, String value) {
-    ArrayList<String> allNodes = new ArrayList<String>();
-    try {
-      for (String id : zk.getChildren(getPath(type), false)) {
-        for (String field : zk.getChildren(id, false)) {
-          if (field.endsWith("/" + key)) {
-            byte[] b = zk.getData(field, false, null);
-            if (new String(b, ENCODING).equals(value)) {
-              allNodes.add(id);
-            }
-          }
-        }
-      }
-    } catch (Exception e) {
-      // Log and go to the next type -- this one might not exist
-      LOG.info("Couldn't find children of " + getPath(type));
-    }
-    return allNodes;
-  }
-
-  @Override
   public void openStorage(Configuration config) throws IOException {
     storage_root = config.get(STORAGE_ROOT);
     job_path = storage_root + "/jobs";

Modified: hive/branches/branch-0.14/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/pom.xml?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/pom.xml (original)
+++ hive/branches/branch-0.14/pom.xml Mon Oct 20 21:38:05 2014
@@ -161,6 +161,7 @@
     <zookeeper.version>3.4.5</zookeeper.version>
     <jpam.version>1.1</jpam.version>
     <felix.version>2.4.0</felix.version>
+    <curator.version>2.5.0</curator.version>
   </properties>
 
   <repositories>
@@ -472,7 +473,13 @@
           </exclusion>
         </exclusions>
       </dependency>
-      <dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+
+        <dependency>
         <groupId>org.codehaus.groovy</groupId>
         <artifactId>groovy-all</artifactId>
         <version>${groovy.version}</version>