You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2012/07/25 20:29:49 UTC
svn commit: r1365722 [10/11] - in /incubator/hcatalog/trunk: ./ ant/ conf/
hcatalog-pig-adapter/ ivy/ src/docs/src/documentation/content/xdocs/
src/docs/src/documentation/content/xdocs/images/
src/java/org/apache/hcatalog/mapreduce/ src/test/e2e/temple...
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The persistent state of a job. The state is stored in one of the
+ * supported storage systems.
+ */
+public class JobState {
+
+ private static final Log LOG = LogFactory.getLog(JobState.class);
+
+ private String id;
+
+ // Storage is instantiated in the constructor
+ private TempletonStorage storage = null;
+
+ private static TempletonStorage.Type type = TempletonStorage.Type.JOB;
+
+ private Configuration config = null;
+
+ public JobState(String id, Configuration conf)
+ throws IOException
+ {
+ this.id = id;
+ config = conf;
+ storage = getStorage(conf);
+ }
+
+ public void delete()
+ throws IOException
+ {
+ try {
+ storage.delete(type, id);
+ } catch (Exception e) {
+ // Error getting children of node -- probably node has been deleted
+ LOG.info("Couldn't delete " + id);
+ }
+ }
+
+ /**
+ * Get an instance of the selected storage class. Defaults to
+ * HDFS storage if none is specified.
+ */
+ public static TempletonStorage getStorageInstance(Configuration conf) {
+ TempletonStorage storage = null;
+ try {
+ storage = (TempletonStorage)
+ Class.forName(conf.get(TempletonStorage.STORAGE_CLASS))
+ .newInstance();
+ } catch (Exception e) {
+ LOG.warn("No storage method found: " + e.getMessage());
+ try {
+ storage = new HDFSStorage();
+ } catch (Exception ex) {
+ LOG.error("Couldn't create storage.");
+ }
+ }
+ return storage;
+ }
+
+ /**
+ * Get an open instance of the selected storage class. Defaults
+ * to HDFS storage if none is specified.
+ */
+ public static TempletonStorage getStorage(Configuration conf) throws IOException {
+ TempletonStorage storage = getStorageInstance(conf);
+ storage.openStorage(conf);
+ return storage;
+ }
+
+ /**
+ * For storage methods that require a connection, this is a hint
+ * that it's time to close the connection.
+ */
+ public void close() throws IOException {
+ storage.closeStorage();
+ }
+
+ //
+ // Properties
+ //
+
+ /**
+ * This job id.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * The percent complete of a job
+ */
+ public String getPercentComplete()
+ throws IOException
+ {
+ return getField("percentComplete");
+ }
+ public void setPercentComplete(String percent)
+ throws IOException
+ {
+ setField("percentComplete", percent);
+ }
+
+ /**
+ * The child id of TempletonControllerJob
+ */
+ public String getChildId()
+ throws IOException
+ {
+ return getField("childid");
+ }
+ public void setChildId(String childid)
+ throws IOException
+ {
+ setField("childid", childid);
+ }
+
+ /**
+ * Add a jobid to the list of children of this job.
+ *
+ * @param jobid
+ * @throws IOException
+ */
+ public void addChild(String jobid) throws IOException {
+ String jobids = "";
+ try {
+ jobids = getField("children");
+ } catch (Exception e) {
+ // There are none or they're not readable.
+ }
+ if (!jobids.equals("")) {
+ jobids += ",";
+ }
+ jobids += jobid;
+ setField("children", jobids);
+ }
+
+ /**
+ * Get a list of jobstates for jobs that are children of this job.
+ * @throws IOException
+ */
+ public List<JobState> getChildren() throws IOException {
+ ArrayList<JobState> children = new ArrayList<JobState>();
+ for (String jobid : getField("children").split(",")) {
+ children.add(new JobState(jobid, config));
+ }
+ return children;
+ }
+
+ /**
+ * Save a comma-separated list of jobids that are children
+ * of this job.
+ * @param jobids
+ * @throws IOException
+ */
+ public void setChildren(String jobids) throws IOException {
+ setField("children", jobids);
+ }
+
+ /**
+ * Set the list of child jobs of this job
+ * @param children
+ */
+ public void setChildren(List<JobState> children) throws IOException {
+ String val = "";
+ for (JobState jobstate : children) {
+ if (!val.equals("")) {
+ val += ",";
+ }
+ val += jobstate.getId();
+ }
+ setField("children", val);
+ }
+
+ /**
+ * The system exit value of the job.
+ */
+ public Long getExitValue()
+ throws IOException
+ {
+ return getLongField("exitValue");
+ }
+ public void setExitValue(long exitValue)
+ throws IOException
+ {
+ setLongField("exitValue", exitValue);
+ }
+
+ /**
+ * When this job was created.
+ */
+ public Long getCreated()
+ throws IOException
+ {
+ return getLongField("created");
+ }
+ public void setCreated(long created)
+ throws IOException
+ {
+ setLongField("created", created);
+ }
+
+ /**
+ * The user who started this job.
+ */
+ public String getUser()
+ throws IOException
+ {
+ return getField("user");
+ }
+ public void setUser(String user)
+ throws IOException
+ {
+ setField("user", user);
+ }
+
+ /**
+ * The url callback
+ */
+ public String getCallback()
+ throws IOException
+ {
+ return getField("callback");
+ }
+ public void setCallback(String callback)
+ throws IOException
+ {
+ setField("callback", callback);
+ }
+
+ /**
+ * The status of a job once it is completed.
+ */
+ public String getCompleteStatus()
+ throws IOException
+ {
+ return getField("completed");
+ }
+ public void setCompleteStatus(String complete)
+ throws IOException
+ {
+ setField("completed", complete);
+ }
+
+ /**
+ * The time when the callback was sent.
+ */
+ public Long getNotifiedTime()
+ throws IOException
+ {
+ return getLongField("notified");
+ }
+ public void setNotifiedTime(long notified)
+ throws IOException
+ {
+ setLongField("notified", notified);
+ }
+
+ //
+ // Helpers
+ //
+
+ /**
+ * Fetch an integer field from the store.
+ */
+ public Long getLongField(String name)
+ throws IOException
+ {
+ String s = storage.getField(type, id, name);
+ if (s == null)
+ return null;
+ else {
+ try {
+ return new Long(s);
+ } catch (NumberFormatException e) {
+ LOG.error("templeton: bug " + name + " " + s + " : "+ e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Store a String field from the store.
+ */
+ public void setField(String name, String val)
+ throws IOException
+ {
+ try {
+ storage.saveField(type, id, name, val);
+ } catch (NotFoundException ne) {
+ throw new IOException(ne.getMessage());
+ }
+ }
+
+ public String getField(String name)
+ throws IOException
+ {
+ return storage.getField(type, id, name);
+ }
+
+ /**
+ * Store a long field.
+ *
+ * @param name
+ * @param val
+ * @throws IOException
+ */
+ public void setLongField(String name, long val)
+ throws IOException
+ {
+ try {
+ storage.saveField(type, id, name, String.valueOf(val));
+ } catch (NotFoundException ne) {
+ throw new IOException("Job " + id + " was not found: " +
+ ne.getMessage());
+ }
+ }
+
+ /**
+ * Get an id for each currently existing job, which can be used to create
+ * a JobState object.
+ *
+ * @param conf
+ * @throws IOException
+ */
+ public static List<String> getJobs(Configuration conf) throws IOException {
+ try {
+ return getStorage(conf).getAllForType(type);
+ } catch (Exception e) {
+ throw new IOException("Can't get jobs", e);
+ }
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class JobStateTracker {
+ // The path to the tracking root
+ private String job_trackingroot = null;
+
+ // The zookeeper connection to use
+ private ZooKeeper zk;
+
+ // The id of the tracking node -- must be a SEQUENTIAL node
+ private String trackingnode;
+
+ // The id of the job this tracking node represents
+ private String jobid;
+
+ // The logger
+ private static final Log LOG = LogFactory.getLog(JobStateTracker.class);
+
+ /**
+ * Constructor for a new node -- takes the jobid of an existing job
+ *
+ */
+ public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker,
+ String job_trackingpath) {
+ this.zk = zk;
+ if (nodeIsTracker) {
+ trackingnode = node;
+ } else {
+ jobid = node;
+ }
+ job_trackingroot = job_trackingpath;
+ }
+
+ /**
+ * Create the parent znode for this job state.
+ */
+ public void create()
+ throws IOException
+ {
+ String[] paths = ZooKeeperStorage.getPaths(job_trackingroot);
+ for (String znode : paths) {
+ try {
+ zk.create(znode, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ } catch (Exception e) {
+ throw new IOException("Unable to create parent nodes");
+ }
+ }
+ try {
+ trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ } catch (Exception e) {
+ throw new IOException("Unable to create " + makeTrackingZnode());
+ }
+ }
+
+ public void delete()
+ throws IOException
+ {
+ try {
+ zk.delete(makeTrackingJobZnode(trackingnode), -1);
+ } catch (Exception e) {
+ // Might have been deleted already
+ LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode));
+ }
+ }
+
+ /**
+ * Get the jobid for this tracking node
+ * @throws IOException
+ */
+ public String getJobID() throws IOException {
+ try {
+ return new String(zk.getData(makeTrackingJobZnode(trackingnode),
+ false, new Stat()));
+ } catch (KeeperException e) {
+ // It was deleted during the transaction
+ throw new IOException("Node already deleted " + trackingnode);
+ } catch (InterruptedException e) {
+ throw new IOException("Couldn't read node " + trackingnode);
+ }
+ }
+
+ /**
+ * Make a ZK path to a new tracking node
+ */
+ public String makeTrackingZnode() {
+ return job_trackingroot + "/";
+ }
+
+ /**
+ * Make a ZK path to an existing tracking node
+ */
+ public String makeTrackingJobZnode(String nodename) {
+ return job_trackingroot + "/" + nodename;
+ }
+
+ /*
+ * Get the list of tracking jobs. These can be used to determine which jobs have
+ * expired.
+ */
+ public static List<String> getTrackingJobs(Configuration conf, ZooKeeper zk)
+ throws IOException {
+ ArrayList<String> jobs = new ArrayList<String>();
+ try {
+ for (String myid : zk.getChildren(
+ conf.get(TempletonStorage.STORAGE_ROOT)
+ + ZooKeeperStorage.TRACKINGDIR, false)) {
+ jobs.add(myid);
+ }
+ } catch (Exception e) {
+ throw new IOException("Can't get tracking children", e);
+ }
+ return jobs;
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+/**
+ * Simple not found exception.
+ */
+public class NotFoundException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public NotFoundException(String msg) {
+ super(msg);
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An empty record reader.
+ */
+public class NullRecordReader
+ extends RecordReader<NullWritable, NullWritable>
+{
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+ throws IOException
+ {}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() { return 1.0f; }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ return false;
+ }
+}
+
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * An empty splitter.
+ */
+public class NullSplit extends InputSplit implements Writable {
+ public long getLength() { return 0; }
+
+ public String[] getLocations() throws IOException {
+ return new String[]{};
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {}
+}
+
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An empty InputFormat.
+ */
+public class SingleInputFormat
+ extends InputFormat<NullWritable, NullWritable>
+{
+ public List<InputSplit> getSplits(JobContext job)
+ throws IOException
+ {
+ List<InputSplit> res = new ArrayList<InputSplit>();
+ res.add(new NullSplit());
+ return res;
+ }
+
+ public RecordReader<NullWritable, NullWritable>
+ createRecordReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException
+ {
+ return new NullRecordReader();
+ }
+}
+
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.templeton.SecureProxySupport;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+
+/**
+ * A Map Reduce job that will start another job.
+ *
+ * We have a single Mapper job that starts a child MR job. The parent
+ * monitors the child child job and ends when the child job exits. In
+ * addition, we
+ *
+ * - write out the parent job id so the caller can record it.
+ * - run a keep alive thread so the job doesn't end.
+ * - Optionally, store the stdout, stderr, and exit value of the child
+ * in hdfs files.
+ */
+public class TempletonControllerJob extends Configured implements Tool {
+ static enum ControllerCounters { SIMPLE_COUNTER };
+
+ public static final String COPY_NAME = "templeton.copy";
+ public static final String STATUSDIR_NAME = "templeton.statusdir";
+ public static final String JAR_ARGS_NAME = "templeton.args";
+ public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
+
+ public static final String STDOUT_FNAME = "stdout";
+ public static final String STDERR_FNAME = "stderr";
+ public static final String EXIT_FNAME = "exit";
+
+ public static final int WATCHER_TIMEOUT_SECS = 10;
+ public static final int KEEP_ALIVE_MSEC = 60 * 1000;
+
+ private static TrivialExecService execService = TrivialExecService.getInstance();
+
+ private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
+
+
+ public static class LaunchMapper
+ extends Mapper<NullWritable, NullWritable, Text, Text>
+ {
+ protected Process startJob(Context context, String user,
+ String overrideClasspath)
+ throws IOException, InterruptedException
+ {
+ Configuration conf = context.getConfiguration();
+ copyLocal(COPY_NAME, conf);
+ String[] jarArgs
+ = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
+
+ ArrayList<String> removeEnv = new ArrayList<String>();
+ removeEnv.add("HADOOP_ROOT_LOGGER");
+ Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
+ overrideClasspath);
+ List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
+ String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+ if(tokenFile != null){
+ jarArgsList.add(1, "-Dmapreduce.job.credentials.binary=" + tokenFile );
+ }
+ return execService.run(jarArgsList, removeEnv, env);
+ }
+
+ private void copyLocal(String var, Configuration conf)
+ throws IOException
+ {
+ String[] filenames = TempletonUtils.decodeArray(conf.get(var));
+ if (filenames != null) {
+ for (String filename : filenames) {
+ Path src = new Path(filename);
+ Path dst = new Path(src.getName());
+ FileSystem fs = src.getFileSystem(conf);
+ System.err.println("templeton: copy " + src + " => " + dst);
+ fs.copyToLocalFile(src, dst);
+ }
+ }
+ }
+
+ @Override
+ public void run(Context context)
+ throws IOException, InterruptedException
+ {
+
+ Configuration conf = context.getConfiguration();
+
+ Process proc = startJob(context,
+ conf.get("user.name"),
+ conf.get(OVERRIDE_CLASSPATH));
+
+ String statusdir = conf.get(STATUSDIR_NAME);
+ Counter cnt = context.getCounter(ControllerCounters.SIMPLE_COUNTER);
+
+ ExecutorService pool = Executors.newCachedThreadPool();
+ executeWatcher(pool, conf, context.getJobID(),
+ proc.getInputStream(), statusdir, STDOUT_FNAME);
+ executeWatcher(pool, conf, context.getJobID(),
+ proc.getErrorStream(), statusdir, STDERR_FNAME);
+ KeepAlive keepAlive = startCounterKeepAlive(pool, cnt);
+
+ proc.waitFor();
+ keepAlive.sendReport = false;
+ pool.shutdown();
+ if (! pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS))
+ pool.shutdownNow();
+
+ writeExitValue(conf, proc.exitValue(), statusdir);
+ JobState state = new JobState(context.getJobID().toString(), conf);
+ state.setExitValue(proc.exitValue());
+ state.setCompleteStatus("done");
+ state.close();
+
+ if (proc.exitValue() != 0)
+ System.err.println("templeton: job failed with exit code "
+ + proc.exitValue());
+ else
+ System.err.println("templeton: job completed with exit code 0");
+ }
+
+ private void executeWatcher(ExecutorService pool, Configuration conf,
+ JobID jobid, InputStream in, String statusdir,
+ String name)
+ throws IOException
+ {
+ Watcher w = new Watcher(conf, jobid, in, statusdir, name);
+ pool.execute(w);
+ }
+
+ private KeepAlive startCounterKeepAlive(ExecutorService pool, Counter cnt)
+ throws IOException
+ {
+ KeepAlive k = new KeepAlive(cnt);
+ pool.execute(k);
+ return k;
+ }
+
+ private void writeExitValue(Configuration conf, int exitValue, String statusdir)
+ throws IOException
+ {
+ if (TempletonUtils.isset(statusdir)) {
+ Path p = new Path(statusdir, EXIT_FNAME);
+ FileSystem fs = p.getFileSystem(conf);
+ OutputStream out = fs.create(p);
+ System.err.println("templeton: Writing exit value "
+ + exitValue + " to " + p);
+ PrintWriter writer = new PrintWriter(out);
+ writer.println(exitValue);
+ writer.close();
+ }
+ }
+ }
+
+ public static class Watcher implements Runnable {
+ private InputStream in;
+ private OutputStream out;
+ private JobID jobid;
+ private Configuration conf;
+
+ public Watcher(Configuration conf, JobID jobid, InputStream in,
+ String statusdir, String name)
+ throws IOException
+ {
+ this.conf = conf;
+ this.jobid = jobid;
+ this.in = in;
+
+ if (name.equals(STDERR_FNAME))
+ out = System.err;
+ else
+ out = System.out;
+
+ if (TempletonUtils.isset(statusdir)) {
+ Path p = new Path(statusdir, name);
+ FileSystem fs = p.getFileSystem(conf);
+ out = fs.create(p);
+ System.err.println("templeton: Writing status to " + p);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ InputStreamReader isr = new InputStreamReader(in);
+ BufferedReader reader = new BufferedReader(isr);
+ PrintWriter writer = new PrintWriter(out);
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ writer.println(line);
+ JobState state = null;
+ try {
+ String percent = TempletonUtils.extractPercentComplete(line);
+ String childid = TempletonUtils.extractChildJobId(line);
+
+ if (percent != null || childid != null) {
+ state = new JobState(jobid.toString(), conf);
+ state.setPercentComplete(percent);
+ state.setChildId(childid);
+ }
+ } catch (IOException e) {
+ System.err.println("templeton: state error: " + e);
+ } finally {
+ if (state != null) {
+ try {
+ state.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+ writer.flush();
+ } catch (IOException e) {
+ System.err.println("templeton: execute error: " + e);
+ }
+ }
+ }
+
+ public static class KeepAlive implements Runnable {
+ private Counter cnt;
+ public boolean sendReport;
+
+ public KeepAlive(Counter cnt)
+ {
+ this.cnt = cnt;
+ this.sendReport = true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (sendReport) {
+ cnt.increment(1);
+ Thread.sleep(KEEP_ALIVE_MSEC);
+ }
+ } catch (InterruptedException e) {
+ // Ok to be interrupted
+ }
+ }
+ }
+
+ private JobID submittedJobId;
+ public String getSubmittedId() {
+ if (submittedJobId == null)
+ return null;
+ else
+ return submittedJobId.toString();
+ }
+
+ /**
+ * Enqueue the job and print out the job id for later collection.
+ */
+ @Override
+ public int run(String[] args)
+ throws IOException, InterruptedException, ClassNotFoundException
+ {
+ Configuration conf = getConf();
+ conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
+ conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName());
+ Job job = new Job(conf);
+ job.setJarByClass(TempletonControllerJob.class);
+ job.setJobName("TempletonControllerJob");
+ job.setMapperClass(LaunchMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setInputFormatClass(SingleInputFormat.class);
+ NullOutputFormat<NullWritable, NullWritable> of
+ = new NullOutputFormat<NullWritable, NullWritable>();
+ job.setOutputFormatClass(of.getClass());
+ job.setNumReduceTasks(0);
+
+ JobClient jc = new JobClient(new JobConf(job.getConfiguration()));
+
+ Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token"));
+ job.getCredentials().addToken(new Text("mr token"), mrdt);
+ job.submit();
+
+ submittedJobId = job.getJobID();
+
+ return 0;
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new TempletonControllerJob(), args);
+ if (ret != 0)
+ System.err.println("TempletonControllerJob failed!");
+ System.exit(ret);
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An interface to handle different Templeton storage methods, including
+ * ZooKeeper and HDFS. Any storage scheme must be able to handle being
+ * run in an HDFS environment, where specific file systems and virtual
+ * machines may not be available.
+ *
+ * Storage is done individually in a hierarchy: type (the data type,
+ * as listed below), then the id (a given jobid, jobtrackingid, etc.),
+ * then the key/value pairs. So an entry might look like:
+ *
+ * JOB
+ * jobid00035
+ * user -> rachel
+ * datecreated -> 2/5/12
+ * etc.
+ *
+ * Each field must be available to be fetched/changed individually.
+ */
+public interface TempletonStorage {
+ // These are the possible types referenced by 'type' below.
+ public enum Type {
+ UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD
+ }
+
+ public static final String STORAGE_CLASS = "templeton.storage.class";
+ public static final String STORAGE_ROOT = "templeton.storage.root";
+
+ /**
+ * Start the cleanup process for this storage type.
+ * @param config
+ */
+ public void startCleanup(Configuration config);
+
+ /**
+ * Save a single key/value pair for a specific job id.
+ * @param type The data type (as listed above)
+ * @param id The String id of this data grouping (jobid, etc.)
+ * @param key The name of the field to save
+ * @param val The value of the field to save
+ */
+ public void saveField(Type type, String id, String key, String val)
+ throws NotFoundException;
+
+ /**
+ * Get the value of one field for a given data type. If the type
+ * is UNKNOWN, search for the id in all types.
+ * @param type The data type (as listed above)
+ * @param id The String id of this data grouping (jobid, etc.)
+ * @param key The name of the field to retrieve
+ * @return The value of the field requested, or null if not
+ * found.
+ */
+ public String getField(Type type, String id, String key);
+
+ /**
+ * Get all the name/value pairs stored for this id.
+ * Be careful using getFields() -- optimistic locking will mean that
+ * your odds of a conflict are decreased if you read/write one field
+ * at a time. getFields() is intended for read-only usage.
+ *
+ * If the type is UNKNOWN, search for the id in all types.
+ *
+ * @param type The data type (as listed above)
+ * @param id The String id of this data grouping (jobid, etc.)
+ * @return A Map of key/value pairs found for this type/id.
+ */
+ public Map<String, String> getFields(Type type, String id);
+
+ /**
+ * Delete a data grouping (all data for a jobid, all tracking data
+ * for a job, etc.). If the type is UNKNOWN, search for the id
+ * in all types.
+ *
+ * @param type The data type (as listed above)
+ * @param id The String id of this data grouping (jobid, etc.)
+ * @return True if successful, false if not, throws NotFoundException
+ * if the id wasn't found.
+ */
+ public boolean delete(Type type, String id) throws NotFoundException;
+
+ /**
+ * Get the id of each data grouping in the storage system.
+ *
+ * @return An ArrayList<String> of ids.
+ */
+ public List<String> getAll();
+
+ /**
+ * Get the id of each data grouping of a given type in the storage
+ * system.
+ * @param type The data type (as listed above)
+ * @return An ArrayList<String> of ids.
+ */
+ public List<String> getAllForType(Type type);
+
+ /**
+ * Get the id of each data grouping that has the specific key/value
+ * pair.
+ * @param key The name of the field to search for
+ * @param value The value of the field to search for
+ * @return An ArrayList<String> of ids.
+ */
+ public List<String> getAllForKey(String key, String value);
+
+ /**
+ * Get the id of each data grouping of a given type that has the
+ * specific key/value pair.
+ * @param type The data type (as listed above)
+ * @param key The name of the field to search for
+ * @param value The value of the field to search for
+ * @return An ArrayList<String> of ids.
+ */
+ public List<String> getAllForTypeAndKey(Type type, String key,
+ String value);
+
+ /**
+ * For storage methods that require a connection, this is a hint
+ * that it's time to open a connection.
+ */
+ public void openStorage(Configuration config) throws IOException;
+
+ /**
+ * For storage methods that require a connection, this is a hint
+ * that it's time to close the connection.
+ */
+ public void closeStorage() throws IOException;
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * General utility methods.
+ */
+public class TempletonUtils {
+ /**
+ * Is the object non-empty?
+ */
+ public static boolean isset(String s) {
+ return (s != null) && (s.length() > 0);
+ }
+
+ /**
+ * Is the object non-empty?
+ */
+ public static boolean isset(char ch) {
+ return (ch != 0);
+ }
+
+ /**
+ * Is the object non-empty?
+ */
+ public static <T> boolean isset(T[] a) {
+ return (a != null) && (a.length > 0);
+ }
+
+
+ /**
+ * Is the object non-empty?
+ */
+ public static <T> boolean isset(Collection<T> col) {
+ return (col != null) && (! col.isEmpty());
+ }
+
+ /**
+ * Is the object non-empty?
+ */
+ public static <K, V> boolean isset(Map<K, V> col) {
+ return (col != null) && (! col.isEmpty());
+ }
+
+
+ public static final Pattern JAR_COMPLETE
+ = Pattern.compile(" map \\d+%\\s+reduce \\d+%$");
+ public static final Pattern PIG_COMPLETE = Pattern.compile(" \\d+% complete$");
+
+ /**
+ * Extract the percent complete line from Pig or Jar jobs.
+ */
+ public static String extractPercentComplete(String line) {
+ Matcher jar = JAR_COMPLETE.matcher(line);
+ if (jar.find())
+ return jar.group().trim();
+
+ Matcher pig = PIG_COMPLETE.matcher(line);
+ if (pig.find())
+ return pig.group().trim();
+
+ return null;
+ }
+
+ public static final Pattern JAR_ID = Pattern.compile(" Running job: (\\S+)$");
+ public static final Pattern PIG_ID = Pattern.compile(" HadoopJobId: (\\S+)$");
+ public static final Pattern[] ID_PATTERNS = {JAR_ID, PIG_ID};
+
+ /**
+ * Extract the job id from jar jobs.
+ */
+ public static String extractChildJobId(String line) {
+ for (Pattern p : ID_PATTERNS) {
+ Matcher m = p.matcher(line);
+ if (m.find())
+ return m.group(1);
+ }
+
+ return null;
+ }
+
+ /**
+ * Take an array of strings and encode it into one string.
+ */
+ public static String encodeArray(String[] plain) {
+ if (plain == null)
+ return null;
+
+ String[] escaped = new String[plain.length];
+
+ for (int i = 0; i < plain.length; ++i) {
+ if (plain[i] == null) {
+ plain[i] = "";
+ }
+ escaped[i] = StringUtils.escapeString(plain[i]);
+ }
+
+ return StringUtils.arrayToString(escaped);
+ }
+
+ /**
+ * Encode a List into a string.
+ */
+ public static String encodeArray(List<String> list) {
+ if (list == null)
+ return null;
+ String[] array = new String[list.size()];
+ return encodeArray(list.toArray(array));
+ }
+
+ /**
+ * Take an encode strings and decode it into an array of strings.
+ */
+ public static String[] decodeArray(String s) {
+ if (s == null)
+ return null;
+
+ String[] escaped = StringUtils.split(s);
+ String[] plain = new String[escaped.length];
+
+ for (int i = 0; i < escaped.length; ++i)
+ plain[i] = StringUtils.unEscapeString(escaped[i]);
+
+ return plain;
+ }
+
+ public static String[] hadoopFsListAsArray(String files, Configuration conf,
+ String user)
+ throws URISyntaxException, FileNotFoundException, IOException,
+ InterruptedException
+ {
+ if (files == null || conf == null) {
+ return null;
+ }
+ String[] dirty = files.split(",");
+ String[] clean = new String[dirty.length];
+
+ for (int i = 0; i < dirty.length; ++i)
+ clean[i] = hadoopFsFilename(dirty[i], conf, user);
+
+ return clean;
+ }
+
+ public static String hadoopFsListAsString(String files, Configuration conf,
+ String user)
+ throws URISyntaxException, FileNotFoundException, IOException,
+ InterruptedException
+ {
+ if (files == null || conf == null) {
+ return null;
+ }
+ return StringUtils.arrayToString(hadoopFsListAsArray(files, conf, user));
+ }
+
+ public static String hadoopFsFilename(String fname, Configuration conf, String user)
+ throws URISyntaxException, FileNotFoundException, IOException,
+ InterruptedException
+ {
+ Path p = hadoopFsPath(fname, conf, user);
+ if (p == null)
+ return null;
+ else
+ return p.toString();
+ }
+
+ /**
+ * @return true iff we are sure the file is not there.
+ */
+ public static boolean hadoopFsIsMissing(FileSystem fs, Path p) {
+ try {
+ return ! fs.exists(p);
+ } catch(Throwable t) {
+ // Got an error, might be there anyway due to a
+ // permissions problem.
+ return false;
+ }
+ }
+
+ public static Path hadoopFsPath(String fname, Configuration conf, String user)
+ throws URISyntaxException, FileNotFoundException, IOException,
+ InterruptedException
+ {
+ if (fname == null || conf == null) {
+ return null;
+ }
+ FileSystem defaultFs = FileSystem.get(new URI(fname), conf, user);
+ URI u = new URI(fname);
+ Path p = new Path(u).makeQualified(defaultFs);
+
+ FileSystem fs = p.getFileSystem(conf);
+ if (hadoopFsIsMissing(fs, p))
+ throw new FileNotFoundException("File " + fname + " does not exist.");
+
+ return p;
+ }
+
+ /**
+ * GET the given url. Returns the number of bytes received.
+ */
+ public static int fetchUrl(URL url)
+ throws IOException
+ {
+ URLConnection cnx = url.openConnection();
+ InputStream in = cnx.getInputStream();
+
+ byte[] buf = new byte[8192];
+ int total = 0;
+ int len = 0;
+ while ((len = in.read(buf)) >= 0)
+ total += len;
+
+ return total;
+ }
+
+ /**
+ * Set the environment variables to specify the hadoop user.
+ */
+ public static Map<String, String> hadoopUserEnv(String user,
+ String overrideClasspath)
+ {
+ HashMap<String, String> env = new HashMap<String, String>();
+ env.put("HADOOP_USER_NAME", user);
+
+ if (overrideClasspath != null) {
+ env.put("HADOOP_USER_CLASSPATH_FIRST", "true");
+ String cur = System.getenv("HADOOP_CLASSPATH");
+ if (TempletonUtils.isset(cur))
+ overrideClasspath = overrideClasspath + ":" + cur;
+ env.put("HADOOP_CLASSPATH", overrideClasspath);
+ }
+
+ return env;
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Execute a local program. This is a singleton service that will
+ * execute a programs on the local box.
+ */
+public class TrivialExecService {
+ private static volatile TrivialExecService theSingleton;
+
+ /**
+ * Retrieve the singleton.
+ */
+ public static synchronized TrivialExecService getInstance() {
+ if (theSingleton == null)
+ theSingleton = new TrivialExecService();
+ return theSingleton;
+ }
+
+ public Process run(List<String> cmd, List<String> removeEnv,
+ Map<String, String> environmentVariables)
+ throws IOException
+ {
+ System.err.println("templeton: starting " + cmd);
+ System.err.print("With environment variables: " );
+ for(Map.Entry<String, String> keyVal : environmentVariables.entrySet()){
+ System.err.println(keyVal.getKey() + "=" + keyVal.getValue());
+ }
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+ for (String key : removeEnv)
+ pb.environment().remove(key);
+ pb.environment().putAll(environmentVariables);
+ return pb.start();
+ }
+
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This does periodic cleanup
+ */
+public class ZooKeeperCleanup extends Thread {
+ protected Configuration appConf;
+
+ // The interval to wake up and check the queue
+ public static final String ZK_CLEANUP_INTERVAL =
+ "templeton.zookeeper.cleanup.interval"; // 12 hours
+
+ // The max age of a task allowed
+ public static final String ZK_CLEANUP_MAX_AGE =
+ "templeton.zookeeper.cleanup.maxage"; // ~ 1 week
+
+ protected static long interval = 1000L * 60L * 60L * 12L;
+ protected static long maxage = 1000L * 60L * 60L * 24L * 7L;
+
+ // The logger
+ private static final Log LOG = LogFactory.getLog(ZooKeeperCleanup.class);
+
+ // Handle to cancel loop
+ private boolean stop = false;
+
+ // The instance
+ private static ZooKeeperCleanup thisclass = null;
+
+ // Whether the cycle is running
+ private static boolean isRunning = false;
+
+ /**
+ * Create a cleanup object. We use the appConfig to configure JobState.
+ * @param appConf
+ */
+ private ZooKeeperCleanup(Configuration appConf) {
+ this.appConf = appConf;
+ interval = appConf.getLong(ZK_CLEANUP_INTERVAL, interval);
+ maxage = appConf.getLong(ZK_CLEANUP_MAX_AGE, maxage);
+ }
+
+ public static ZooKeeperCleanup getInstance(Configuration appConf) {
+ if (thisclass != null) {
+ return thisclass;
+ }
+ thisclass = new ZooKeeperCleanup(appConf);
+ return thisclass;
+ }
+
+ public static void startInstance(Configuration appConf) throws IOException {
+ if (! isRunning) {
+ getInstance(appConf).start();
+ }
+ }
+
+ /**
+ * Run the cleanup loop.
+ *
+ * @throws IOException
+ */
+ public void run() {
+ ZooKeeper zk = null;
+ List<String> nodes = null;
+ isRunning = true;
+ while (!stop) {
+ try {
+ // Put each check in a separate try/catch, so if that particular
+ // cycle fails, it'll try again on the next cycle.
+ try {
+ zk = ZooKeeperStorage.zkOpen(appConf);
+
+ nodes = getChildList(zk);
+
+ for (String node : nodes) {
+ boolean deleted = checkAndDelete(node, zk);
+ if (!deleted) {
+ break;
+ }
+ }
+
+ zk.close();
+ } catch (Exception e) {
+ LOG.error("Cleanup cycle failed: " + e.getMessage());
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ // We're trying to exit anyway, just ignore.
+ }
+ }
+ }
+
+ long sleepMillis = (long) (Math.random() * interval);
+ LOG.info("Next execution: " + new Date(new Date().getTime()
+ + sleepMillis));
+ Thread.sleep(sleepMillis);
+
+ } catch (Exception e) {
+ // If sleep fails, we should exit now before things get worse.
+ isRunning = false;
+ LOG.error("Cleanup failed: " + e.getMessage(), e);
+ }
+ }
+ isRunning = false;
+ }
+
+ /**
+ * Get the list of jobs from JobState
+ *
+ * @throws IOException
+ */
+ public List<String> getChildList(ZooKeeper zk) {
+ try {
+ List<String> jobs = JobStateTracker.getTrackingJobs(appConf, zk);
+ Collections.sort(jobs);
+ return jobs;
+ } catch (IOException e) {
+ LOG.info("No jobs to check.");
+ }
+ return new ArrayList<String>();
+ }
+
+ /**
+ * Check to see if a job is more than maxage old, and delete it if so.
+ */
+ public boolean checkAndDelete(String node, ZooKeeper zk) {
+ JobState state = null;
+ try {
+ JobStateTracker tracker = new JobStateTracker(node, zk, true,
+ appConf.get(TempletonStorage.STORAGE_ROOT +
+ ZooKeeperStorage.TRACKINGDIR));
+ long now = new Date().getTime();
+ state = new JobState(tracker.getJobID(), appConf);
+
+ // Set the default to 0 -- if the created date is null, there was
+ // an error in creation, and we want to delete it anyway.
+ long then = 0;
+ if (state.getCreated() != null) {
+ then = state.getCreated();
+ }
+ if (now - then > maxage) {
+ LOG.info("Deleting " + tracker.getJobID());
+ state.delete();
+ tracker.delete();
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ LOG.info("checkAndDelete failed for " + node);
+ // We don't throw a new exception for this -- just keep going with the
+ // next one.
+ return true;
+ } finally {
+ if (state != null) {
+ try {
+ state.close();
+ } catch (IOException e) {
+ LOG.info("Couldn't close job state.");
+ }
+ }
+ }
+ }
+
+ // Handle to stop this process from the outside if needed.
+ public void exit() {
+ stop = true;
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * A storage implementation based on storing everything in ZooKeeper.
+ * This keeps everything in a central location that is guaranteed
+ * to be available and accessible.
+ *
+ * Data is stored with each key/value pair being a node in ZooKeeper.
+ */
+public class ZooKeeperStorage implements TempletonStorage {
+
+ public static final String TRACKINGDIR = "/created";
+
+ // Locations for each of the storage types
+ public String storage_root = null;
+ public String job_path = null;
+ public String job_trackingpath = null;
+ public String overhead_path = null;
+
+ public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
+ public static final String ZK_SESSION_TIMEOUT
+ = "templeton.zookeeper.session-timeout";
+
+ public static final String ENCODING = "UTF-8";
+
+ private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
+
+ private ZooKeeper zk;
+
+ /**
+ * Open a ZooKeeper connection for the JobState.
+ */
+ public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
+ throws IOException
+ {
+ return new ZooKeeper(zkHosts,
+ zkSessionTimeout,
+ new Watcher() {
+ @Override
+ synchronized public void process(WatchedEvent event) {
+ }
+ });
+ }
+
+ /**
+ * Open a ZooKeeper connection for the JobState.
+ */
+ public static ZooKeeper zkOpen(Configuration conf)
+ throws IOException
+ {
+ return zkOpen(conf.get(ZK_HOSTS),
+ conf.getInt(ZK_SESSION_TIMEOUT, 30000));
+ }
+
+ public ZooKeeperStorage() {
+ // No-op -- this is needed to be able to instantiate the
+ // class from the name.
+ }
+
+ /**
+ * Close this ZK connection.
+ */
+ public void close()
+ throws IOException
+ {
+ if (zk != null) {
+ try {
+ zk.close();
+ zk = null;
+ } catch (InterruptedException e) {
+ throw new IOException("Closing ZooKeeper connection", e);
+ }
+ }
+ }
+
+ public void startCleanup(Configuration config) {
+ try {
+ ZooKeeperCleanup.startInstance(config);
+ } catch (Exception e) {
+ LOG.warn("Cleanup instance didn't start.");
+ }
+ }
+
+ /**
+ * Create a node in ZooKeeper
+ */
+ public void create(Type type, String id)
+ throws IOException
+ {
+ try {
+ String[] paths = getPaths(makeZnode(type, id));
+ boolean wasCreated = false;
+ for (String znode : paths) {
+ try {
+ zk.create(znode, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ wasCreated = true;
+ } catch (KeeperException.NodeExistsException e) {
+ }
+ }
+ if (wasCreated) {
+ try {
+ // Really not sure if this should go here. Will have
+ // to see how the storage mechanism evolves.
+ if (type.equals(Type.JOB)) {
+ JobStateTracker jt = new JobStateTracker(id, zk, false,
+ job_trackingpath);
+ jt.create();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error tracking: " + e.getMessage());
+ // If we couldn't create the tracker node, don't
+ // create the main node.
+ zk.delete(makeZnode(type, id), -1);
+ }
+ }
+ if (zk.exists(makeZnode(type, id), false) == null)
+ throw new IOException("Unable to create " + makeZnode(type, id));
+ if (wasCreated) {
+ try {
+ saveField(type, id, "created",
+ Long.toString(System.currentTimeMillis()));
+ } catch (NotFoundException nfe) {
+ // Wow, something's really wrong.
+ throw new IOException("Couldn't write to node " + id, nfe);
+ }
+ }
+ } catch (KeeperException e) {
+ throw new IOException("Creating " + id, e);
+ } catch (InterruptedException e) {
+ throw new IOException("Creating " + id, e);
+ }
+ }
+
+ /**
+ * Get the path based on the job type.
+ *
+ * @param type
+ */
+ public String getPath(Type type) {
+ String typepath = overhead_path;
+ switch (type) {
+ case JOB:
+ typepath = job_path;
+ break;
+ case JOBTRACKING:
+ typepath = job_trackingpath;
+ break;
+ }
+ return typepath;
+ }
+
+ public static String[] getPaths(String fullpath) {
+ ArrayList<String> paths = new ArrayList<String>();
+ if (fullpath.length() < 2) {
+ paths.add(fullpath);
+ } else {
+ int location = 0;
+ while ((location = fullpath.indexOf("/", location + 1)) > 0) {
+ paths.add(fullpath.substring(0, location));
+ }
+ paths.add(fullpath);
+ }
+ String[] strings = new String[paths.size()];
+ return paths.toArray(strings);
+ }
+
+ /**
+ * A helper method that sets a field value.
+ * @param type
+ * @param id
+ * @param name
+ * @param val
+ * @throws KeeperException
+ * @throws UnsupportedEncodingException
+ * @throws InterruptedException
+ */
+ private void setFieldData(Type type, String id, String name, String val)
+ throws KeeperException, UnsupportedEncodingException, InterruptedException
+ {
+ try {
+ zk.create(makeFieldZnode(type, id, name),
+ val.getBytes(ENCODING),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch(KeeperException.NodeExistsException e) {
+ zk.setData(makeFieldZnode(type, id, name),
+ val.getBytes(ENCODING),
+ -1);
+ }
+ }
+
+ /**
+ * Make a ZK path to the named field.
+ */
+ public String makeFieldZnode(Type type, String id, String name) {
+ return makeZnode(type, id) + "/" + name;
+ }
+
+ /**
+ * Make a ZK path to job
+ */
+ public String makeZnode(Type type, String id) {
+ return getPath(type) + "/" + id;
+ }
+
+ @Override
+ public void saveField(Type type, String id, String key, String val)
+ throws NotFoundException {
+ try {
+ if (val != null) {
+ create(type, id);
+ setFieldData(type, id, key, val);
+ }
+ } catch(Exception e) {
+ throw new NotFoundException("Writing " + key + ": " + val + ", "
+ + e.getMessage());
+ }
+ }
+
+ @Override
+ public String getField(Type type, String id, String key) {
+ try {
+ byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null);
+ return new String(b, ENCODING);
+ } catch(Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, String> getFields(Type type, String id) {
+ HashMap<String, String> map = new HashMap<String, String>();
+ try {
+ for (String node: zk.getChildren(makeZnode(type, id), false)) {
+ byte[] b = zk.getData(makeFieldZnode(type, id, node),
+ false, null);
+ map.put(node, new String(b, ENCODING));
+ }
+ } catch(Exception e) {
+ return map;
+ }
+ return map;
+ }
+
+ @Override
+ public boolean delete(Type type, String id) throws NotFoundException {
+ try {
+ for (String child : zk.getChildren(makeZnode(type, id), false)) {
+ try {
+ zk.delete(makeFieldZnode(type, id, child), -1);
+ } catch (Exception e) {
+ // Other nodes may be trying to delete this at the same time,
+ // so just log errors and skip them.
+ throw new NotFoundException("Couldn't delete " +
+ makeFieldZnode(type, id, child));
+ }
+ }
+ try {
+ zk.delete(makeZnode(type, id), -1);
+ } catch (Exception e) {
+ // Same thing -- might be deleted by other nodes, so just go on.
+ throw new NotFoundException("Couldn't delete " +
+ makeZnode(type, id));
+ }
+ } catch (Exception e) {
+ // Error getting children of node -- probably node has been deleted
+ throw new NotFoundException("Couldn't get children of " +
+ makeZnode(type, id));
+ }
+ return true;
+ }
+
+ @Override
+ public List<String> getAll() {
+ ArrayList<String> allNodes = new ArrayList<String>();
+ for (Type type: Type.values()) {
+ allNodes.addAll(getAllForType(type));
+ }
+ return allNodes;
+ }
+
+ @Override
+ public List<String> getAllForType(Type type) {
+ try {
+ return zk.getChildren(getPath(type), false);
+ } catch (Exception e) {
+ return new ArrayList<String>();
+ }
+ }
+
+ @Override
+ public List<String> getAllForKey(String key, String value) {
+ ArrayList<String> allNodes = new ArrayList<String>();
+ try {
+ for (Type type: Type.values()) {
+ allNodes.addAll(getAllForTypeAndKey(type, key, value));
+ }
+ } catch (Exception e) {
+ LOG.info("Couldn't find children.");
+ }
+ return allNodes;
+ }
+
+ @Override
+ public List<String> getAllForTypeAndKey(Type type, String key, String value) {
+ ArrayList<String> allNodes = new ArrayList<String>();
+ try {
+ for (String id : zk.getChildren(getPath(type), false)) {
+ for (String field : zk.getChildren(id, false)) {
+ if (field.endsWith("/" + key)) {
+ byte[] b = zk.getData(field, false, null);
+ if (new String(b, ENCODING).equals(value)) {
+ allNodes.add(id);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ // Log and go to the next type -- this one might not exist
+ LOG.info("Couldn't find children of " + getPath(type));
+ }
+ return allNodes;
+ }
+
+ @Override
+ public void openStorage(Configuration config) throws IOException {
+ storage_root = config.get(STORAGE_ROOT);
+ job_path = storage_root + "/jobs";
+ job_trackingpath = storage_root + TRACKINGDIR;
+ overhead_path = storage_root + "/overhead";
+
+ if (zk == null) {
+ zk = zkOpen(config);
+ }
+ }
+
+ @Override
+ public void closeStorage() throws IOException {
+ close();
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.hcatalog.templeton.ColumnDesc;
+import org.apache.hcatalog.templeton.TableDesc;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * TestDesc - Test the desc objects that are correctly converted to
+ * and from json. This also sets every field of the TableDesc object.
+ */
+public class TestDesc extends TestCase {
+ public void testTableDesc()
+ throws Exception
+ {
+ TableDesc td = buildTableDesc();
+ assertNotNull(td);
+
+ String json = toJson(td);
+ assertTrue(json.length() > 100);
+
+ TableDesc tdCopy = (TableDesc) fromJson(json, TableDesc.class);
+ assertEquals(td, tdCopy);
+ }
+
+ private TableDesc buildTableDesc() {
+ TableDesc x = new TableDesc();
+ x.group = "staff";
+ x.permissions = "755";
+ x.external = true;
+ x.ifNotExists = true;
+ x.table = "a_table";
+ x.comment = "a comment";
+ x.columns = buildColumns();
+ x.partitionedBy = buildPartitionedBy();
+ x.clusteredBy = buildClusterBy();
+ x.format = buildStorageFormat();
+ x.location = "hdfs://localhost:9000/user/me/a_table";
+ x.tableProperties = buildGenericProperties();
+ return x;
+ }
+
+ public List<ColumnDesc> buildColumns() {
+ ArrayList<ColumnDesc> x = new ArrayList<ColumnDesc>();
+ x.add(new ColumnDesc("id", "bigint", null));
+ x.add(new ColumnDesc("price", "float", "The unit price"));
+ x.add(new ColumnDesc("name", "string", "The item name"));
+ return x;
+ }
+
+ public List<ColumnDesc> buildPartitionedBy() {
+ ArrayList<ColumnDesc> x = new ArrayList<ColumnDesc>();
+ x.add(new ColumnDesc("country", "string", "The country of origin"));
+ return x;
+ }
+
+ public TableDesc.ClusteredByDesc buildClusterBy() {
+ TableDesc.ClusteredByDesc x = new TableDesc.ClusteredByDesc();
+ x.columnNames = new ArrayList<String>();
+ x.columnNames.add("id");
+ x.sortedBy = buildSortedBy();
+ x.numberOfBuckets = 16;
+ return x;
+ }
+
+ public List<TableDesc.ClusterSortOrderDesc> buildSortedBy() {
+ ArrayList<TableDesc.ClusterSortOrderDesc> x
+ = new ArrayList<TableDesc.ClusterSortOrderDesc>();
+ x.add(new TableDesc.ClusterSortOrderDesc("id", TableDesc.SortDirectionDesc.ASC));
+ return x;
+ }
+
+ public TableDesc.StorageFormatDesc buildStorageFormat() {
+ TableDesc.StorageFormatDesc x = new TableDesc.StorageFormatDesc();
+ x.rowFormat = buildRowFormat();
+ x.storedAs = "rcfile";
+ x.storedBy = buildStoredBy();
+ return x;
+ }
+
+ public TableDesc.RowFormatDesc buildRowFormat() {
+ TableDesc.RowFormatDesc x = new TableDesc.RowFormatDesc();
+ x.fieldsTerminatedBy = "\u0001";
+ x.collectionItemsTerminatedBy = "\u0002";
+ x.mapKeysTerminatedBy = "\u0003";
+ x.linesTerminatedBy = "\u0004";
+ x.serde = buildSerde();
+ return x;
+ }
+
+ public TableDesc.SerdeDesc buildSerde() {
+ TableDesc.SerdeDesc x = new TableDesc.SerdeDesc();
+ x.name = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
+ x.properties = new HashMap<String, String>();
+ x.properties.put("field.delim", ",");
+ return x;
+ }
+
+ public TableDesc.StoredByDesc buildStoredBy() {
+ TableDesc.StoredByDesc x = new TableDesc.StoredByDesc();
+ x.className = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
+ x.properties = new HashMap<String, String>();
+ x.properties.put("hbase.columns.mapping", "cf:string");
+ x.properties.put("hbase.table.name", "hbase_table_0");
+ return x;
+ }
+
+ public Map<String, String> buildGenericProperties() {
+ HashMap<String, String> x = new HashMap<String, String>();
+ x.put("carmas", "evil");
+ x.put("rachel", "better");
+ x.put("ctdean", "angelic");
+ x.put("paul", "dangerously unbalanced");
+ x.put("dra", "organic");
+ return x;
+ }
+
+ private String toJson(Object obj)
+ throws Exception
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ mapper.writeValue(out, obj);
+ return out.toString();
+ }
+
+ private Object fromJson(String json, Class klass)
+ throws Exception
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(json, klass);
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton;
+
+import junit.framework.TestCase;
+
+import org.apache.hcatalog.templeton.Main;
+import org.apache.hcatalog.templeton.mock.MockServer;
+import java.util.List;
+
+/*
+ * Test that the server code exists, and responds to basic requests.
+ */
+public class TestServer extends TestCase {
+
+ MockServer server;
+
+ public void setUp() {
+ new Main(null); // Initialize the config
+ server = new MockServer();
+ }
+
+ public void testServer() {
+ assertNotNull(server);
+ }
+
+ public void testStatus() {
+ assertEquals(server.status().get("status"), "ok");
+ }
+
+ public void testVersions() {
+ assertEquals(server.version().get("version"), "v1");
+ }
+
+ public void testFormats() {
+ assertEquals(1, server.requestFormats().size());
+ assertEquals( ((List)server.requestFormats().get("responseTypes")).get(0), "application/json");
+ }
+}
Added: incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.mock;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.exec.ExecuteException;
+import org.apache.hcatalog.templeton.ExecBean;
+import org.apache.hcatalog.templeton.ExecService;
+import org.apache.hcatalog.templeton.NotAuthorizedException;
+
+public class MockExecService implements ExecService {
+
+ public ExecBean run(String program, List<String> args,
+ Map<String, String> env) {
+ ExecBean bean = new ExecBean();
+ bean.stdout = program;
+ bean.stderr = args.toString();
+ return bean;
+ }
+
+ @Override
+ public ExecBean runUnlimited(String program,
+ List<String> args, Map<String, String> env)
+ throws NotAuthorizedException, ExecuteException, IOException {
+ ExecBean bean = new ExecBean();
+ bean.stdout = program;
+ bean.stderr = args.toString();
+ return null;
+ }
+}