You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2014/10/20 23:38:05 UTC
svn commit: r1633234 - in /hive/branches/branch-0.14: ./
hcatalog/webhcat/svr/
hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/
Author: ekoifman
Date: Mon Oct 20 21:38:05 2014
New Revision: 1633234
URL: http://svn.apache.org/r1633234
Log:
HIVE-8387 add retry logic to ZooKeeperStorage in WebHCat
Modified:
hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml
hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java
hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
hive/branches/branch-0.14/pom.xml
Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/pom.xml Mon Oct 20 21:38:05 2014
@@ -38,7 +38,7 @@
</properties>
<dependencies>
- <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- dependencies are always listed in sorted order by groupId, artifactId -->
<!-- intra-project -->
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
@@ -67,6 +67,14 @@
<artifactId>commons-exec</artifactId>
<version>${commons-exec.version}</version>
</dependency>
+
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -190,6 +198,37 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>include-curator</id>
+ <!-- WebHCat uses Curator library to work with ZooKeeper. Thus it must be available
+ on a random node in the cluster where LaunchMapper runs to actually execute the job.
+ The simplest way is to include it in webhcat jar which is shipped to target node since
+ it contains LaunchMapper.java.-->
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>true</minimizeJar>
+ <artifactSet>
+ <includes>
+ <include>org.apache.curator</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.curator</pattern>
+ <shadedPattern>webhcat.org.apache.curator</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java Mon Oct 20 21:38:05 2014
@@ -25,9 +25,7 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -116,32 +114,6 @@ public class HDFSStorage implements Temp
}
@Override
- public Map<String, String> getFields(Type type, String id) {
- HashMap<String, String> map = new HashMap<String, String>();
- BufferedReader in = null;
- Path p = new Path(getPath(type) + "/" + id);
- try {
- for (FileStatus status : fs.listStatus(p)) {
- in = new BufferedReader(new InputStreamReader(fs.open(status.getPath())));
- String line = null;
- String val = "";
- while ((line = in.readLine()) != null) {
- if (!val.equals("")) {
- val += "\n";
- }
- val += line;
- }
- map.put(status.getPath().getName(), val);
- }
- } catch (IOException e) {
- LOG.trace("Couldn't find " + p);
- } finally {
- close(in);
- }
- return map;
- }
-
- @Override
public boolean delete(Type type, String id) throws NotFoundException {
Path p = new Path(getPath(type) + "/" + id);
try {
@@ -153,14 +125,6 @@ public class HDFSStorage implements Temp
return false;
}
- @Override
- public List<String> getAll() {
- ArrayList<String> allNodes = new ArrayList<String>();
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForType(type));
- }
- return allNodes;
- }
@Override
public List<String> getAllForType(Type type) {
@@ -177,40 +141,6 @@ public class HDFSStorage implements Temp
}
@Override
- public List<String> getAllForKey(String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- try {
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForTypeAndKey(type, key, value));
- }
- } catch (Exception e) {
- LOG.trace("Couldn't find children for key " + key + ": " +
- e.getMessage());
- }
- return allNodes;
- }
-
- @Override
- public List<String> getAllForTypeAndKey(Type type, String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- HashMap<String, String> map = new HashMap<String, String>();
- try {
- for (FileStatus status :
- fs.listStatus(new Path(getPath(type)))) {
- map = (HashMap<String, String>)
- getFields(type, status.getPath().getName());
- if (map.get(key).equals(value)) {
- allNodes.add(status.getPath().getName());
- }
- }
- } catch (Exception e) {
- LOG.trace("Couldn't find children for key " + key + ": " +
- e.getMessage());
- }
- return allNodes;
- }
-
- @Override
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(TempletonStorage.STORAGE_ROOT);
if (fs == null) {
Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java Mon Oct 20 21:38:05 2014
@@ -24,19 +24,30 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
+/*
+ * The general idea here is to create
+ * /created/1
+ * /created/2
+ * /created/3 ....
+ * for each job submitted. The node number is generated by ZK (PERSISTENT_SEQUENTIAL) and the
+ * payload is the JobId. Basically this keeps track of the order in which jobs were submitted,
+ * and ZooKeeperCleanup uses this to purge old job info.
+ * Since the /jobs/<id> node has a create/update timestamp
+ * (http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#sc_zkStatStructure) this whole
+ * thing can be removed.
+*/
public class JobStateTracker {
// The path to the tracking root
private String job_trackingroot = null;
// The zookeeper connection to use
- private ZooKeeper zk;
+ private CuratorFramework zk;
// The id of the tracking node -- must be a SEQUENTIAL node
private String trackingnode;
@@ -51,7 +62,7 @@ public class JobStateTracker {
* Constructor for a new node -- takes the jobid of an existing job
*
*/
- public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker,
+ public JobStateTracker(String node, CuratorFramework zk, boolean nodeIsTracker,
String job_trackingpath) {
this.zk = zk;
if (nodeIsTracker) {
@@ -65,30 +76,25 @@ public class JobStateTracker {
/**
* Create the parent znode for this job state.
*/
- public void create()
- throws IOException {
- String[] paths = ZooKeeperStorage.getPaths(job_trackingroot);
- for (String znode : paths) {
- try {
- zk.create(znode, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- } catch (Exception e) {
- throw new IOException("Unable to create parent nodes");
- }
+ public void create() throws IOException {
+ try {
+ zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .withACL(Ids.OPEN_ACL_UNSAFE).forPath(job_trackingroot);
+ } catch (KeeperException.NodeExistsException e) {
+ //root must exist already
+ } catch (Exception e) {
+ throw new IOException("Unable to create parent nodes");
}
try {
- trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ trackingnode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
+ .withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeTrackingZnode(), jobid.getBytes());
} catch (Exception e) {
throw new IOException("Unable to create " + makeTrackingZnode());
}
}
-
- public void delete()
- throws IOException {
+ public void delete() throws IOException {
try {
- zk.delete(makeTrackingJobZnode(trackingnode), -1);
+ zk.delete().forPath(makeTrackingJobZnode(trackingnode));
} catch (Exception e) {
// Might have been deleted already
LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode));
@@ -101,13 +107,10 @@ public class JobStateTracker {
*/
public String getJobID() throws IOException {
try {
- return new String(zk.getData(makeTrackingJobZnode(trackingnode),
- false, new Stat()));
- } catch (KeeperException e) {
+ return new String(zk.getData().forPath(makeTrackingJobZnode(trackingnode)));
+ } catch (Exception e) {
// It was deleted during the transaction
- throw new IOException("Node already deleted " + trackingnode);
- } catch (InterruptedException e) {
- throw new IOException("Couldn't read node " + trackingnode);
+ throw new IOException("Node already deleted " + trackingnode, e);
}
}
@@ -129,13 +132,13 @@ public class JobStateTracker {
* Get the list of tracking jobs. These can be used to determine which jobs have
* expired.
*/
- public static List<String> getTrackingJobs(Configuration conf, ZooKeeper zk)
+ public static List<String> getTrackingJobs(Configuration conf, CuratorFramework zk)
throws IOException {
ArrayList<String> jobs = new ArrayList<String>();
try {
- for (String myid : zk.getChildren(
+ for (String myid : zk.getChildren().forPath(
conf.get(TempletonStorage.STORAGE_ROOT)
- + ZooKeeperStorage.TRACKINGDIR, false)) {
+ + ZooKeeperStorage.TRACKINGDIR)) {
jobs.add(myid);
}
} catch (Exception e) {
Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java Mon Oct 20 21:38:05 2014
@@ -20,7 +20,6 @@ package org.apache.hive.hcatalog.templet
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +44,7 @@ import org.apache.hadoop.conf.Configurat
public interface TempletonStorage {
// These are the possible types referenced by 'type' below.
public enum Type {
- UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD
+ UNKNOWN, JOB, JOBTRACKING
}
public static final String STORAGE_CLASS = "templeton.storage.class";
@@ -79,20 +78,6 @@ public interface TempletonStorage {
public String getField(Type type, String id, String key);
/**
- * Get all the name/value pairs stored for this id.
- * Be careful using getFields() -- optimistic locking will mean that
- * your odds of a conflict are decreased if you read/write one field
- * at a time. getFields() is intended for read-only usage.
- *
- * If the type is UNKNOWN, search for the id in all types.
- *
- * @param type The data type (as listed above)
- * @param id The String id of this data grouping (jobid, etc.)
- * @return A Map of key/value pairs found for this type/id.
- */
- public Map<String, String> getFields(Type type, String id);
-
- /**
* Delete a data grouping (all data for a jobid, all tracking data
* for a job, etc.). If the type is UNKNOWN, search for the id
* in all types.
@@ -105,13 +90,6 @@ public interface TempletonStorage {
public boolean delete(Type type, String id) throws NotFoundException;
/**
- * Get the id of each data grouping in the storage system.
- *
- * @return An ArrayList<String> of ids.
- */
- public List<String> getAll();
-
- /**
* Get the id of each data grouping of a given type in the storage
* system.
* @param type The data type (as listed above)
@@ -120,26 +98,6 @@ public interface TempletonStorage {
public List<String> getAllForType(Type type);
/**
- * Get the id of each data grouping that has the specific key/value
- * pair.
- * @param key The name of the field to search for
- * @param value The value of the field to search for
- * @return An ArrayList<String> of ids.
- */
- public List<String> getAllForKey(String key, String value);
-
- /**
- * Get the id of each data grouping of a given type that has the
- * specific key/value pair.
- * @param type The data type (as listed above)
- * @param key The name of the field to search for
- * @param value The value of the field to search for
- * @return An ArrayList<String> of ids.
- */
- public List<String> getAllForTypeAndKey(Type type, String key,
- String value);
-
- /**
* For storage methods that require a connection, this is a hint
* that it's time to open a connection.
*/
Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java Mon Oct 20 21:38:05 2014
@@ -24,8 +24,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Date;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -89,7 +89,7 @@ public class ZooKeeperCleanup extends Th
* @throws IOException
*/
public void run() {
- ZooKeeper zk = null;
+ CuratorFramework zk = null;
List<String> nodes = null;
isRunning = true;
while (!stop) {
@@ -112,13 +112,7 @@ public class ZooKeeperCleanup extends Th
} catch (Exception e) {
LOG.error("Cleanup cycle failed: " + e.getMessage());
} finally {
- if (zk != null) {
- try {
- zk.close();
- } catch (InterruptedException e) {
- // We're trying to exit anyway, just ignore.
- }
- }
+ if (zk != null) zk.close();
}
long sleepMillis = (long) (Math.random() * interval);
@@ -140,7 +134,7 @@ public class ZooKeeperCleanup extends Th
*
* @throws IOException
*/
- public List<String> getChildList(ZooKeeper zk) {
+ public List<String> getChildList(CuratorFramework zk) {
try {
List<String> jobs = JobStateTracker.getTrackingJobs(appConf, zk);
Collections.sort(jobs);
@@ -154,7 +148,7 @@ public class ZooKeeperCleanup extends Th
/**
* Check to see if a job is more than maxage old, and delete it if so.
*/
- public boolean checkAndDelete(String node, ZooKeeper zk) {
+ public boolean checkAndDelete(String node, CuratorFramework zk) {
JobState state = null;
try {
JobStateTracker tracker = new JobStateTracker(node, zk, true,
@@ -167,8 +161,11 @@ public class ZooKeeperCleanup extends Th
// an error in creation, and we want to delete it anyway.
long then = 0;
if (state.getCreated() != null) {
+ //this is set in ZooKeeperStorage.create()
then = state.getCreated();
}
+ //todo: this should check that the job actually completed and likely use completion time
+ //which is not tracked directly but available on /jobs/<id> node via "mtime" in Stat
if (now - then > maxage) {
LOG.info("Deleting " + tracker.getJobID());
state.delete();
@@ -177,7 +174,7 @@ public class ZooKeeperCleanup extends Th
}
return false;
} catch (Exception e) {
- LOG.info("checkAndDelete failed for " + node);
+ LOG.info("checkAndDelete failed for " + node + " due to: " + e.getMessage());
// We don't throw a new exception for this -- just keep going with the
// next one.
return true;
Modified: hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java Mon Oct 20 21:38:05 2014
@@ -19,21 +19,18 @@
package org.apache.hive.hcatalog.templeton.tool;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
/**
* A storage implementation based on storing everything in ZooKeeper.
@@ -60,29 +57,29 @@ public class ZooKeeperStorage implements
private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
- private ZooKeeper zk;
+ private CuratorFramework zk;
/**
* Open a ZooKeeper connection for the JobState.
*/
- public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
+ public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
throws IOException {
- return new ZooKeeper(zkHosts,
- zkSessionTimeout,
- new Watcher() {
- @Override
- synchronized public void process(WatchedEvent event) {
- }
- });
+ //do we need to add a connection status listener? What will that do?
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
+ CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
+ zk.start();
+ return zk;
}
/**
* Open a ZooKeeper connection for the JobState.
*/
- public static ZooKeeper zkOpen(Configuration conf)
- throws IOException {
+ public static CuratorFramework zkOpen(Configuration conf) throws IOException {
+ /*the silly looking call to Builder below is to get the default value of session timeout
+ from Curator which itself exposes it as system property*/
return zkOpen(conf.get(ZK_HOSTS),
- conf.getInt(ZK_SESSION_TIMEOUT, 30000));
+ conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
}
public ZooKeeperStorage() {
@@ -93,15 +90,9 @@ public class ZooKeeperStorage implements
/**
* Close this ZK connection.
*/
- public void close()
- throws IOException {
+ public void close() throws IOException {
if (zk != null) {
- try {
- zk.close();
- zk = null;
- } catch (InterruptedException e) {
- throw new IOException("Closing ZooKeeper connection", e);
- }
+ zk.close();
}
}
@@ -118,48 +109,54 @@ public class ZooKeeperStorage implements
*/
public void create(Type type, String id)
throws IOException {
+ boolean wasCreated = false;
try {
- String[] paths = getPaths(makeZnode(type, id));
- boolean wasCreated = false;
- for (String znode : paths) {
- try {
- zk.create(znode, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- wasCreated = true;
- } catch (KeeperException.NodeExistsException e) {
+ zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeZnode(type, id));
+ wasCreated = true;
+ }
+ catch(KeeperException.NodeExistsException ex) {
+ //we just created top level node for this jobId
+ }
+ catch(Exception ex) {
+ throw new IOException("Error creating " + makeZnode(type, id), ex);
+ }
+ if(wasCreated) {
+ try {
+ // Really not sure if this should go here. Will have
+ // to see how the storage mechanism evolves.
+ if (type.equals(Type.JOB)) {
+ JobStateTracker jt = new JobStateTracker(id, zk, false, job_trackingpath);
+ jt.create();
}
- }
- if (wasCreated) {
+ } catch (Exception e) {
+ LOG.error("Error tracking (jobId=" + id + "): " + e.getMessage());
+ // If we couldn't create the tracker node, don't create the main node.
try {
- // Really not sure if this should go here. Will have
- // to see how the storage mechanism evolves.
- if (type.equals(Type.JOB)) {
- JobStateTracker jt = new JobStateTracker(id, zk, false,
- job_trackingpath);
- jt.create();
- }
- } catch (Exception e) {
- LOG.warn("Error tracking: " + e.getMessage());
- // If we couldn't create the tracker node, don't
- // create the main node.
- zk.delete(makeZnode(type, id), -1);
+ zk.delete().forPath(makeZnode(type, id));//default version is -1
+ }
+ catch(Exception ex) {
+ //EK: it's not obvious that this is the right logic, if we don't record the 'callback'
+ //for example and never notify the client of job completion
+ throw new IOException("Failed to delete " + makeZnode(type, id) + ":" + ex);
}
}
- if (zk.exists(makeZnode(type, id), false) == null)
+ }
+ try {
+ if (zk.checkExists().forPath(makeZnode(type, id)) == null) {
throw new IOException("Unable to create " + makeZnode(type, id));
- if (wasCreated) {
- try {
- saveField(type, id, "created",
- Long.toString(System.currentTimeMillis()));
- } catch (NotFoundException nfe) {
- // Wow, something's really wrong.
- throw new IOException("Couldn't write to node " + id, nfe);
- }
}
- } catch (KeeperException e) {
- throw new IOException("Creating " + id, e);
- } catch (InterruptedException e) {
- throw new IOException("Creating " + id, e);
+ }
+ catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ if (wasCreated) {
+ try {
+ saveField(type, id, "created",
+ Long.toString(System.currentTimeMillis()));
+ } catch (NotFoundException nfe) {
+ // Wow, something's really wrong.
+ throw new IOException("Couldn't write to node " + id, nfe);
+ }
}
}
@@ -198,25 +195,14 @@ public class ZooKeeperStorage implements
/**
* A helper method that sets a field value.
- * @param type
- * @param id
- * @param name
- * @param val
- * @throws KeeperException
- * @throws UnsupportedEncodingException
- * @throws InterruptedException
+ * @throws java.lang.Exception
*/
- private void setFieldData(Type type, String id, String name, String val)
- throws KeeperException, UnsupportedEncodingException, InterruptedException {
+ private void setFieldData(Type type, String id, String name, String val) throws Exception {
try {
- zk.create(makeFieldZnode(type, id, name),
- val.getBytes(ENCODING),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ zk.create().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE)
+ .forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING));
} catch (KeeperException.NodeExistsException e) {
- zk.setData(makeFieldZnode(type, id, name),
- val.getBytes(ENCODING),
- -1);
+ zk.setData().forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING));
}
}
@@ -251,7 +237,7 @@ public class ZooKeeperStorage implements
@Override
public String getField(Type type, String id, String key) {
try {
- byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null);
+ byte[] b = zk.getData().forPath(makeFieldZnode(type, id, key));
return new String(b, ENCODING);
} catch (Exception e) {
return null;
@@ -259,26 +245,12 @@ public class ZooKeeperStorage implements
}
@Override
- public Map<String, String> getFields(Type type, String id) {
- HashMap<String, String> map = new HashMap<String, String>();
- try {
- for (String node : zk.getChildren(makeZnode(type, id), false)) {
- byte[] b = zk.getData(makeFieldZnode(type, id, node),
- false, null);
- map.put(node, new String(b, ENCODING));
- }
- } catch (Exception e) {
- return map;
- }
- return map;
- }
-
- @Override
public boolean delete(Type type, String id) throws NotFoundException {
try {
- for (String child : zk.getChildren(makeZnode(type, id), false)) {
+
+ for (String child : zk.getChildren().forPath(makeZnode(type, id))) {
try {
- zk.delete(makeFieldZnode(type, id, child), -1);
+ zk.delete().forPath(makeFieldZnode(type, id, child));
} catch (Exception e) {
// Other nodes may be trying to delete this at the same time,
// so just log errors and skip them.
@@ -287,7 +259,7 @@ public class ZooKeeperStorage implements
}
}
try {
- zk.delete(makeZnode(type, id), -1);
+ zk.delete().forPath(makeZnode(type, id));
} catch (Exception e) {
// Same thing -- might be deleted by other nodes, so just go on.
throw new NotFoundException("Couldn't delete " +
@@ -302,58 +274,15 @@ public class ZooKeeperStorage implements
}
@Override
- public List<String> getAll() {
- ArrayList<String> allNodes = new ArrayList<String>();
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForType(type));
- }
- return allNodes;
- }
-
- @Override
public List<String> getAllForType(Type type) {
try {
- return zk.getChildren(getPath(type), false);
+ return zk.getChildren().forPath(getPath(type));
} catch (Exception e) {
return new ArrayList<String>();
}
}
@Override
- public List<String> getAllForKey(String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- try {
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForTypeAndKey(type, key, value));
- }
- } catch (Exception e) {
- LOG.info("Couldn't find children.");
- }
- return allNodes;
- }
-
- @Override
- public List<String> getAllForTypeAndKey(Type type, String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- try {
- for (String id : zk.getChildren(getPath(type), false)) {
- for (String field : zk.getChildren(id, false)) {
- if (field.endsWith("/" + key)) {
- byte[] b = zk.getData(field, false, null);
- if (new String(b, ENCODING).equals(value)) {
- allNodes.add(id);
- }
- }
- }
- }
- } catch (Exception e) {
- // Log and go to the next type -- this one might not exist
- LOG.info("Couldn't find children of " + getPath(type));
- }
- return allNodes;
- }
-
- @Override
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(STORAGE_ROOT);
job_path = storage_root + "/jobs";
Modified: hive/branches/branch-0.14/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/pom.xml?rev=1633234&r1=1633233&r2=1633234&view=diff
==============================================================================
--- hive/branches/branch-0.14/pom.xml (original)
+++ hive/branches/branch-0.14/pom.xml Mon Oct 20 21:38:05 2014
@@ -161,6 +161,7 @@
<zookeeper.version>3.4.5</zookeeper.version>
<jpam.version>1.1</jpam.version>
<felix.version>2.4.0</felix.version>
+ <curator.version>2.5.0</curator.version>
</properties>
<repositories>
@@ -472,7 +473,13 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>${groovy.version}</version>