You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:29 UTC

[42/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index f7e3e3a..6f96e1d 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.RawStoreProxy;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -86,23 +88,17 @@ public class DbNotificationListener extends MetaStoreEventListener {
   // HiveConf rather than a Configuration.
   private HiveConf hiveConf;
   private MessageFactory msgFactory;
-  private RawStore rs;
-
-  private synchronized void init(HiveConf conf) {
-    try {
-      rs = RawStoreProxy.getProxy(conf, conf,
-          conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999);
-    } catch (MetaException e) {
-      LOG.error("Unable to connect to raw store, notifications will not be tracked", e);
-      rs = null;
-    }
-    if (cleaner == null && rs != null) {
-      cleaner = new CleanerThread(conf, rs);
+
+  private synchronized void init(HiveConf conf) throws MetaException {
+    if (cleaner == null) {
+      cleaner =
+          new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf,
+              conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999));
       cleaner.start();
     }
   }
 
-  public DbNotificationListener(Configuration config) {
+  public DbNotificationListener(Configuration config) throws MetaException {
     super(config);
     // The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor
     // with a Configuration parameter, so we have to declare config as Configuration.  But it
@@ -142,7 +138,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   /**
@@ -157,7 +153,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   /**
@@ -170,10 +166,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
     Table after = tableEvent.getNewTable();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
-            .buildAlterTableMessage(before, after).toString());
+            .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString());
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   class FileIterator implements Iterator<String> {
@@ -281,7 +277,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -296,7 +292,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -309,10 +305,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
     Partition after = partitionEvent.getNewPartition();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
-            .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
+            .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString());
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -326,7 +322,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
             .buildCreateDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event);
+    process(event, dbEvent);
   }
 
   /**
@@ -340,7 +336,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
             .buildDropDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event);
+    process(event, dbEvent);
   }
 
   /**
@@ -354,7 +350,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
             .buildCreateFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event);
+    process(event, fnEvent);
   }
 
   /**
@@ -368,7 +364,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
             .buildDropFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event);
+    process(event, fnEvent);
   }
 
   /**
@@ -382,7 +378,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
             .buildCreateIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   /**
@@ -396,7 +392,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
             .buildDropIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   /**
@@ -411,7 +407,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
             .buildAlterIndexMessage(before, after).toString());
     event.setDbName(before.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   class FileChksumIterator implements Iterator<String> {
@@ -443,12 +439,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
   public void onInsert(InsertEvent insertEvent) throws MetaException {
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
-            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
+            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
             new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
             .toString());
     event.setDbName(insertEvent.getDb());
     event.setTableName(insertEvent.getTable());
-    process(event);
+    process(event, insertEvent);
   }
 
   /**
@@ -472,18 +468,27 @@ public class DbNotificationListener extends MetaStoreEventListener {
     return (int)millis;
   }
 
-  // Process this notification by adding it to metastore DB
-  private void process(NotificationEvent event) {
+  /**
+   * Process this notification by adding it to metastore DB.
+   *
+   * @param event NotificationEvent is the object written to the metastore DB.
+   * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
+   *                      DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
+   */
+  private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
     event.setMessageFormat(msgFactory.getMessageFormat());
-    if (rs != null) {
-      synchronized (NOTIFICATION_TBL_LOCK) {
-        LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
-            event.getMessage());
-        rs.addNotificationEvent(event);
-      }
-    } else {
-      LOG.warn("Dropping event " + event + " since notification is not running.");
+    synchronized (NOTIFICATION_TBL_LOCK) {
+      LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
+          event.getMessage());
+      HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
     }
+
+      // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
+      if (event.isSetEventId()) {
+        listenerEvent.putParameter(
+            MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+            Long.toString(event.getEventId()));
+      }
   }
 
   private static class CleanerThread extends Thread {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
new file mode 100644
index 0000000..a4f2d59
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+/**
+ * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent
+ * parameters.
+ */
+public class MetaStoreEventListenerConstants {
+  /*
+   * DbNotificationListener keys reserved for updating ListenerEvent parameters.
+   *
+   * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener
+   *                                   processed during an event. This event identifier might be shared
+   *                                   across other MetaStoreEventListener implementations.
+   */
+  public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index e765305..5bea0a6 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.hive.hcatalog</groupId>
     <artifactId>hive-hcatalog</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
new file mode 100644
index 0000000..78987ab
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+  private RegexSerDe serde;
+  private final StructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+  
+  /**
+   * @param endPoint the end point to write to
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, null, conn);
+  }
+  
+  /**
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, conf, conn);
+  }
+  
+  /**
+   * @param regex to parse the data
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf, conn);
+    this.serde = createSerde(tbl, conf, regex);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+  
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  @Override
+  protected StructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  @Override
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
+  @Override
+  public void write(long transactionId, byte[] record)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction("
+              + transactionId + ")", e);
+    }
+  }
+
+  /**
+   * Creates RegexSerDe
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @param regex  used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+      ArrayList<String> tableColumns = getCols(tbl);
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+      RegexSerDe serde = new RegexSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
+    }
+  }
+  
+  private static ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return colNames;
+  }
+
+  /**
+   * Encode Utf8 encoded string bytes using RegexSerDe
+   * 
+   * @param utf8StrRecord
+   * @return The encoded object
+   * @throws SerializationError
+   */
+  @Override
+  public Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index bf29993..097de9b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -64,10 +64,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.tools.FileDump;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -82,11 +78,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
@@ -485,9 +485,9 @@ public class TestStreaming {
 
     NullWritable key = rr.createKey();
     OrcStruct value = rr.createValue();
-    for (int i = 0; i < records.length; i++) {
+    for (String record : records) {
       Assert.assertEquals(true, rr.next(key, value));
-      Assert.assertEquals(records[i], value.toString());
+      Assert.assertEquals(record, value.toString());
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
@@ -741,7 +741,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -787,6 +787,75 @@ public class TestStreaming {
   }
 
   @Test
+  public void testTransactionBatchCommit_Regex() throws Exception {
+    testTransactionBatchCommit_Regex(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_RegexUGI() throws Exception {
+    testTransactionBatchCommit_Regex(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    String regex = "([^,]*),(.*)";
+    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+
+    // data should not be visible
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+
+
+    connection.close();
+
+
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    regex = "([^:]*):(.*)";
+    writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1:Hello streaming".getBytes());
+    txnBatch.commit();
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+  
+  @Test
   public void testTransactionBatchCommit_Json() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
@@ -802,7 +871,7 @@ public class TestStreaming {
     txnBatch.write(rec1.getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -929,7 +998,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -948,13 +1017,13 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -965,14 +1034,14 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
             "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
             "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
             "{4, Welcome to streaming - once again}");
 
@@ -1009,11 +1078,11 @@ public class TestStreaming {
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.beginNextTransaction();
     txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1021,17 +1090,17 @@ public class TestStreaming {
     txnBatch2.beginNextTransaction();
     txnBatch2.write("4,Welcome to streaming - once again".getBytes());
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}");
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}",
         "{4, Welcome to streaming - once again}");
@@ -1700,7 +1769,7 @@ public class TestStreaming {
     txnBatch.heartbeat();//this is no-op on closed batch
     txnBatch.abort();//ditto
     GetOpenTxnsInfoResponse r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
     List<TxnInfo> ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
     Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1764,7 +1833,7 @@ public class TestStreaming {
       expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
 
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
     Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1787,7 +1856,7 @@ public class TestStreaming {
       expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
     
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
     Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/java-client/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml
index 3b53664..3bb9f4d 100644
--- a/hcatalog/webhcat/java-client/pom.xml
+++ b/hcatalog/webhcat/java-client/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.hive.hcatalog</groupId>
     <artifactId>hive-hcatalog</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
index b9cb067..86d3acb 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
@@ -434,7 +434,7 @@ public class TestHCatClient {
     HCatClient client = HCatClient.create(new Configuration(hcatConf));
     boolean isExceptionCaught = false;
     // Table creation with a long table name causes ConnectionFailureException
-    final String tableName = "Temptable" + new BigInteger(200, new Random()).toString(2);
+    final String tableName = "Temptable" + new BigInteger(260, new Random()).toString(2);
 
     ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
     cols.add(new HCatFieldSchema("id", Type.INT, "id columns"));

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml
index c5ad387..a55ffe9 100644
--- a/hcatalog/webhcat/svr/pom.xml
+++ b/hcatalog/webhcat/svr/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.hive.hcatalog</groupId>
     <artifactId>hive-hcatalog</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
@@ -45,9 +45,50 @@
       <artifactId>hive-hcatalog-core</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-runner</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-sslengine</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+	  <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+	  <artifactId>jsp-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+	  <artifactId>jsp-api-2.1</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- inter-project -->
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-rewrite</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.sun.jersey</groupId>
       <artifactId>jersey-core</artifactId>
       <version>${jersey.version}</version>
@@ -93,11 +134,6 @@
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.eclipse.jetty.aggregate</groupId>
-      <artifactId>jetty-all-server</artifactId>
-      <version>${jetty.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jul-to-slf4j</artifactId>
       <version>${slf4j.version}</version>
@@ -107,7 +143,7 @@
       <artifactId>hadoop-auth</artifactId>
       <version>${hadoop.version}</version>
         <exclusions>
-             <exclusion>
+          <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
           </exclusion>
@@ -121,16 +157,42 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+	</exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- test inter-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 54d0907..0ea7d88 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -111,6 +111,43 @@ public class AppConfig extends Configuration {
   public static final String MR_AM_MEMORY_MB     = "templeton.mr.am.memory.mb";
   public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder";
 
+  /*
+   * These parameters controls the maximum number of concurrent job submit/status/list
+   * operations in templeton service. If more number of concurrent requests comes then
+   * they will be rejected with BusyException.
+   */
+  public static final String JOB_SUBMIT_MAX_THREADS = "templeton.parallellism.job.submit";
+  public static final String JOB_STATUS_MAX_THREADS = "templeton.parallellism.job.status";
+  public static final String JOB_LIST_MAX_THREADS = "templeton.parallellism.job.list";
+
+  /*
+   * These parameters controls the maximum time job submit/status/list operation is
+   * executed in templeton service. On time out, the execution is interrupted and
+   * TimeoutException is returned to client. On time out
+   *   For list and status operation, there is no action needed as they are read requests.
+   *   For submit operation, we do best effort to kill the job if its generated. Enabling
+   *     this parameter may have following side effects
+   *     1) There is a possibility for having active job for some time when the client gets
+   *        response for submit operation and a list operation from client could potential
+   *        show the newly created job which may eventually be killed with no guarantees.
+   *     2) If submit operation retried by client then there is a possibility of duplicate
+   *        jobs triggered.
+   *
+   * Time out configs should be configured in seconds.
+   *
+   */
+  public static final String JOB_SUBMIT_TIMEOUT   = "templeton.job.submit.timeout";
+  public static final String JOB_STATUS_TIMEOUT   = "templeton.job.status.timeout";
+  public static final String JOB_LIST_TIMEOUT   = "templeton.job.list.timeout";
+
+  /*
+   * If task execution time out is configured for submit operation then job may need to
+   * be killed on execution time out. These parameters controls the maximum number of
+   * retries and retry wait time in seconds for executing the time out task.
+   */
+  public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = "templeton.job.timeout.task.retry.count";
+  public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = "templeton.job.timeout.task.retry.interval";
+
   /**
    * see webhcat-default.xml
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
index 4b2dfec..622f92d 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,10 +42,11 @@ public class DeleteDelegator extends TempletonDelegator {
   public QueueStatusBean run(String user, String id)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException
   {
-    UserGroupInformation ugi = UgiFactory.getUgi(user);
+    UserGroupInformation ugi = null;
     WebHCatJTShim tracker = null;
     JobState state = null;
     try {
+      ugi = UgiFactory.getUgi(user);
       tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
       JobID jobid = StatusDelegator.StringToJobID(id);
       if (jobid == null)
@@ -69,6 +71,8 @@ public class DeleteDelegator extends TempletonDelegator {
         tracker.close();
       if (state != null)
         state.close();
+      if (ugi != null)
+        FileSystem.closeAllForUGI(ugi);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index f0296cb..1953028 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator {
                String statusdir, String callback, String completedUrl, boolean enablelog,
                Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException
   {
     runAs = user;
     List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir,

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index 84cd5b9..1246b40 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator {
                boolean usesHcatalog, String completedUrl,
                boolean enablelog, Boolean enableJobReconnect, JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
     runAs = user;
     List<String> args = makeArgs(jar, mainClass,
       libjars, files, jarArgs, defines,

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
new file mode 100644
index 0000000..e703eff
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class JobCallable<T> implements Callable<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class);
+
+  static public enum JobState {
+    STARTED,
+    FAILED,
+    COMPLETED
+  }
+
+  /*
+   * Job state of job request. Changes to the state are synchronized using
+   * setStateAndResult. This is required due to two different threads,
+   * main thread and job execute thread, tries to change state and organize
+   * clean up tasks.
+   */
+  private JobState jobState = JobState.STARTED;
+
+  /*
+   * Result of JobCallable task after successful task completion. This is
+   * expected to be set by the thread which executes JobCallable task.
+   */
+  public T returnResult = null;
+
+  /*
+   * Sets the job state to FAILED. Returns true if FAILED status is set.
+   * Otherwise, it returns false.
+   */
+  public boolean setJobStateFailed() {
+    return setStateAndResult(JobState.FAILED, null);
+  }
+
+  /*
+   * Sets the job state to COMPLETED and also sets the results value. Returns true
+   * if COMPLETED status is set. Otherwise, it returns false.
+   */
+  public boolean setJobStateCompleted(T result) {
+    return setStateAndResult(JobState.COMPLETED, result);
+  }
+
+  /*
+   * Sets the job state and result. Returns true if status and result are set.
+   * Otherwise, it returns false.
+   */
+  private synchronized boolean setStateAndResult(JobState jobState, T result) {
+    if (this.jobState == JobState.STARTED) {
+      this.jobState = jobState;
+      this.returnResult = result;
+      return true;
+    } else {
+      LOG.info("Failed to set job state to " + jobState + " due to job state "
+                  + this.jobState + ". Expected state is " + JobState.STARTED);
+    }
+
+    return false;
+  }
+
+  /*
+   * Executes the callable task with help of execute() call and gets the result
+   * of the task. It also sets job status as COMPLETED if state is not already
+   * set to FAILED and returns result to future.
+   */
+  public T call() throws Exception {
+
+    /*
+     * Don't catch any execution exceptions here and let the caller catch it.
+     */
+    T result = this.execute();
+
+    if (!this.setJobStateCompleted(result)) {
+     /*
+      * Failed to set job status as COMPLETED which mean the main thread would have
+      * exited and not waiting for the result. Call cleanup() to execute any cleanup.
+      */
+      cleanup();
+      return null;
+    }
+
+    return this.returnResult;
+  }
+
+  /*
+   * Abstract method to be overridden for task execution.
+   */
+  public abstract T execute() throws Exception;
+
+  /*
+   * Cleanup method called to run cleanup tasks if job state is FAILED. By default,
+   * no cleanup is provided.
+   */
+  public void cleanup() {}
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
new file mode 100644
index 0000000..9ac4588
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobRequestExecutor<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class);
+  private static AppConfig appConf = Main.getAppConfigInstance();
+
+  /*
+   * Thread pool to execute job requests.
+   */
+  private ThreadPoolExecutor jobExecutePool = null;
+
+  /*
+   * Type of job request.
+   */
+  private JobRequestType requestType;
+
+  /*
+   * Config name used to find the number of concurrent requests.
+   */
+  private String concurrentRequestsConfigName;
+
+  /*
+   * Config name used to find the maximum time job request can be executed.
+   */
+  private String jobTimeoutConfigName;
+
+  /*
+   * Job request execution time out in seconds. If it is 0 then request
+   * will not be timed out.
+   */
+  private int requestExecutionTimeoutInSec = 0;
+
+  /*
+   * Amount of time a thread can be alive in thread pool before cleaning this up. Core threads
+   * will not be cleanup from thread pool.
+   */
+  private int threadKeepAliveTimeInHours = 1;
+
+  /*
+   * Maximum number of times a cancel request is sent to job request execution
+   * thread. Future.cancel may not be able to interrupt the thread if it is
+   * blocked on network calls.
+   */
+  private int maxTaskCancelRetryCount = 10;
+
+  /*
+   * Wait time in milliseconds before another cancel request is made.
+   */
+  private int maxTaskCancelRetryWaitTimeInMs = 1000;
+
+  /*
+   * A flag to indicate whether to cancel the task when exception TimeoutException or
+   * InterruptedException or CancellationException raised. The default is cancel thread.
+   */
+  private boolean enableCancelTask = true;
+
+  /*
+   * Job Request type.
+   */
+  public enum JobRequestType {
+    Submit,
+    Status,
+    List
+  }
+
+  /*
+   * Creates a job request object and sets up execution environment. Creates a thread pool
+   * to execute job requests.
+   *
+   * @param requestType
+   *          Job request type
+   *
+   * @param concurrentRequestsConfigName
+   *          Config name to be used to extract number of concurrent requests to be serviced.
+   *
+   * @param jobTimeoutConfigName
+   *          Config name to be used to extract maximum time a task can execute a request.
+   *
+   * @param enableCancelTask
+   *          A flag to indicate whether to cancel the task when exception TimeoutException
+   *          or InterruptedException or CancellationException raised.
+   *
+   */
+  public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
+                            String jobTimeoutConfigName, boolean enableCancelTask) {
+
+    this.concurrentRequestsConfigName = concurrentRequestsConfigName;
+    this.jobTimeoutConfigName = jobTimeoutConfigName;
+    this.requestType = requestType;
+    this.enableCancelTask = enableCancelTask;
+
+    /*
+     * The default number of threads will be 0. That means thread pool is not used and
+     * operation is executed with the current thread.
+     */
+    int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ?
+                                appConf.getInt(concurrentRequestsConfigName, 0) : 0;
+
+    if (threads > 0) {
+      /*
+       * Create a thread pool with no queue wait time to execute the operation. This will ensure
+       * that job requests are rejected if there are already maximum number of threads busy.
+       */
+      this.jobExecutePool = new ThreadPoolExecutor(threads, threads,
+                             threadKeepAliveTimeInHours, TimeUnit.HOURS,
+                             new SynchronousQueue<Runnable>());
+       this.jobExecutePool.allowCoreThreadTimeOut(true);
+
+      /*
+       * Get the job request time out value. If this configuration value is set to 0
+       * then job request will wait until it finishes.
+       */
+      if (!StringUtils.isEmpty(jobTimeoutConfigName)) {
+        this.requestExecutionTimeoutInSec = appConf.getInt(jobTimeoutConfigName, 0);
+      }
+
+      LOG.info("Configured " + threads + " threads for job request type " + this.requestType
+                 + " with time out " + this.requestExecutionTimeoutInSec + " s.");
+    } else {
+      /*
+       * If threads are not configured then they will be executed in current thread itself.
+       */
+      LOG.info("No thread pool configured for job request type " + this.requestType);
+    }
+  }
+
+  /*
+   * Creates a job request object and sets up execution environment. Creates a thread pool
+   * to execute job requests.
+   *
+   * @param requestType
+   *          Job request type
+   *
+   * @param concurrentRequestsConfigName
+   *          Config name to be used to extract number of concurrent requests to be serviced.
+   *
+   * @param jobTimeoutConfigName
+   *          Config name to be used to extract maximum time a task can execute a request.
+   *
+   */
+  public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
+                            String jobTimeoutConfigName) {
+    this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, true);
+  }
+
+  /*
+   * Returns true of thread pool is created and can be used for executing a job request.
+   * Otherwise, returns false.
+   */
+  public boolean isThreadPoolEnabled() {
+    return this.jobExecutePool != null;
+  }
+
+  /*
+   * Executes job request operation. If thread pool is not created then job request is
+   * executed in current thread itself.
+   *
+   * @param jobExecuteCallable
+   *          Callable object to run the job request task.
+   *
+   */
+  public T execute(JobCallable<T> jobExecuteCallable) throws InterruptedException,
+                 TimeoutException, TooManyRequestsException, ExecutionException {
+    /*
+     * The callable shouldn't be null to execute. The thread pool also should be configured
+     * to execute requests.
+     */
+    assert (jobExecuteCallable != null);
+    assert (this.jobExecutePool != null);
+
+    String type = this.requestType.toString().toLowerCase();
+
+    String retryMessageForConcurrentRequests = "Please wait for some time before retrying "
+                  + "the operation. Please refer to the config " + concurrentRequestsConfigName
+                  + " to configure concurrent requests.";
+
+    LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeoutInSec
+              + "seconds.");
+    Future<T> future = null;
+
+    try {
+      future = this.jobExecutePool.submit(jobExecuteCallable);
+    } catch (RejectedExecutionException rejectedException) {
+      /*
+       * Not able to find thread to execute the job request. Raise Busy exception and client
+       * can retry the operation.
+       */
+      String tooManyRequestsExceptionMessage = "Unable to service the " + type + " job request as "
+                        + "templeton service is busy with too many " + type + " job requests. "
+                        + retryMessageForConcurrentRequests;
+
+      LOG.warn(tooManyRequestsExceptionMessage);
+      throw new TooManyRequestsException(tooManyRequestsExceptionMessage);
+    }
+
+    T result = null;
+
+    try {
+      result = this.requestExecutionTimeoutInSec > 0
+                ? future.get(this.requestExecutionTimeoutInSec, TimeUnit.SECONDS) : future.get();
+    } catch (TimeoutException e) {
+      /*
+       * See if the execution thread has just completed operation and result is available.
+       * If result is available then return the result. Otherwise, raise exception.
+       */
+      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
+        String message = this.requestType + " job request got timed out. Please wait for some time "
+                       + "before retrying the operation. Please refer to the config "
+                       + jobTimeoutConfigName + " to configure job request time out.";
+        LOG.warn(message);
+
+        /*
+         * Throw TimeoutException to caller.
+         */
+        throw new TimeoutException(message);
+      }
+    } catch (InterruptedException e) {
+      /*
+       * See if the execution thread has just completed operation and result is available.
+       * If result is available then return the result. Otherwise, raise exception.
+       */
+      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
+        String message = this.requestType + " job request got interrupted. Please wait for some time "
+                       + "before retrying the operation.";
+        LOG.warn(message);
+
+        /*
+         * Throw TimeoutException to caller.
+         */
+        throw new InterruptedException(message);
+      }
+    } catch (CancellationException e) {
+      /*
+       * See if the execution thread has just completed operation and result is available.
+       * If result is available then return the result. Otherwise, raise exception.
+       */
+      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
+        String message = this.requestType + " job request got cancelled and thread got interrupted. "
+                       + "Please wait for some time before retrying the operation.";
+        LOG.warn(message);
+
+        throw new InterruptedException(message);
+      }
+    } finally {
+      /*
+       * If the thread is still active and needs to be cancelled then cancel it. This may
+       * happen in case task got interrupted, or timed out.
+       */
+      if (enableCancelTask) {
+        cancelExecutePoolThread(future);
+      }
+    }
+
+    LOG.debug("Completed " + type + " job request.");
+
+    return result;
+  }
+
+  /*
+   * Initiate cancel request to cancel the thread execution and interrupt the thread.
+   * If thread interruption is not handled by jobExecuteCallable then thread may continue
+   * running to completion. The cancel call may fail for some scenarios. In that case,
+   * retry the cancel call until it returns true or max retry count is reached.
+   *
+   * @param future
+   *          Future object which has handle to cancel the thread.
+   *
+   */
+  private void cancelExecutePoolThread(Future<T> future) {
+    int retryCount = 0;
+    while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) {
+      LOG.info("Task is still executing the job request. Cancelling it with retry count: "
+               + retryCount);
+      if (future.cancel(true)) {
+        /*
+         * Cancelled the job request and return to client.
+         */
+        LOG.info("Cancel job request issued successfully.");
+        return;
+      }
+
+      retryCount++;
+      try {
+        Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs);
+      } catch (InterruptedException e) {
+        /*
+         * Nothing to do. Just retry.
+         */
+      }
+    }
+
+    LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled()
+                    + " Retry count: " + retryCount);
+  }
+
+  /*
+   * Tries to get the job result if job request is completed. Otherwise it sets job status
+   * to FAILED such that execute thread can do necessary clean up based on FAILED state.
+   */
+  private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> jobExecuteCallable) {
+    if (!jobExecuteCallable.setJobStateFailed()) {
+      LOG.info("Job is already COMPLETED. Returning the result.");
+      return jobExecuteCallable.returnResult;
+    } else {
+      LOG.info("Job status set to FAILED. Job clean up to be done by execute thread "
+              + "after job request is executed.");
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index b3f44a2..9bea897 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -23,16 +23,19 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.templeton.tool.JobState;
 import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
@@ -50,9 +53,26 @@ public class LauncherDelegator extends TempletonDelegator {
   static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP}
   private boolean secureMeatastoreAccess = false;
   private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*";
+  private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute";
+  private final int jobTimeoutTaskRetryCount;
+  private final int jobTimeoutTaskRetryIntervalInSec;
+
+  /**
+   * Current thread used to set in execution threads.
+   */
+  private final String submitThreadId = Thread.currentThread().getName();
+
+  /**
+   * Job request executor to submit job requests.
+   */
+  private static JobRequestExecutor<EnqueueBean> jobRequest =
+                   new JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit,
+                   AppConfig.JOB_SUBMIT_MAX_THREADS, AppConfig.JOB_SUBMIT_TIMEOUT, false);
 
   public LauncherDelegator(AppConfig appConf) {
     super(appConf);
+    jobTimeoutTaskRetryCount = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0);
+    jobTimeoutTaskRetryIntervalInSec = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0);
   }
 
   public void registerJob(String id, String user, String callback,
@@ -70,19 +90,102 @@ public class LauncherDelegator extends TempletonDelegator {
     }
   }
 
+  /*
+   * Submit job request. If maximum concurrent job submit requests are configured then submit
+   * request will be executed on a thread from thread pool. If job submit request time out is
+   * configured then request execution thread will be interrupted if thread times out. Also
+   * does best efforts to identify if job is submitted and kill it quietly.
+   */
+  public EnqueueBean enqueueController(final String user, final Map<String, Object> userArgs,
+                     final String callback, final List<String> args)
+    throws NotAuthorizedException, BusyException, IOException, QueueException, TooManyRequestsException {
+
+    EnqueueBean bean = null;
+    final TempletonControllerJob controllerJob = getTempletonController();
+
+    if (jobRequest.isThreadPoolEnabled()) {
+      JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, userArgs, callback,
+                                                                     args, controllerJob);
+      try {
+        bean = jobRequest.execute(jobExecuteCallable);
+      } catch (TimeoutException ex) {
+       /*
+        * Job request got timed out. Job kill should have started. Return to client with
+        * QueueException.
+        */
+        throw new QueueException(ex.getMessage());
+      } catch (InterruptedException ex) {
+       /*
+        * Job request got interrupted. Job kill should have started. Return to client with
+        * with QueueException.
+        */
+        throw new QueueException(ex.getMessage());
+      } catch (ExecutionException ex) {
+        /*
+         * ExecutionException is raised if job execution gets an exception. Return to client
+         * with the exception.
+         */
+        throw new QueueException(ex.getMessage());
+      }
+    } else {
+      LOG.info("No thread pool configured for submit job request. Executing "
+                      + "the job request in current thread.");
+
+      bean = enqueueJob(user, userArgs, callback, args, controllerJob);
+    }
+
+    return bean;
+  }
+
+  /*
+   * Job callable task for job submit operation. Overrides behavior of execute()
+   * to submit job. Also, overrides the behavior of cleanup() to kill the job in case
+   * job submission request is timed out or interrupted.
+   */
+  private JobCallable<EnqueueBean> getJobSubmitTask(final String user,
+                     final Map<String, Object> userArgs, final String callback,
+                     final List<String> args, final TempletonControllerJob controllerJob) {
+      return new JobCallable<EnqueueBean>() {
+        @Override
+        public EnqueueBean execute() throws NotAuthorizedException, BusyException, IOException,
+                                       QueueException {
+         /*
+          * Change the current thread name to include parent thread Id if it is executed
+          * in thread pool. Useful to extract logs specific to a job request and helpful
+          * to debug job issues.
+          */
+          Thread.currentThread().setName(String.format("%s-%s-%s", JOB_SUBMIT_EXECUTE_THREAD_PREFIX,
+                                       submitThreadId, Thread.currentThread().getId()));
+
+          return enqueueJob(user, userArgs, callback, args, controllerJob);
+        }
+
+        @Override
+        public void cleanup() {
+          /*
+           * Failed to set job status as COMPLETED which mean the main thread would have
+           * exited and not waiting for the result. Kill the submitted job.
+           */
+          LOG.info("Job kill not done by main thread. Trying to kill now.");
+          killTempletonJobWithRetry(user, controllerJob.getSubmittedId());
+        }
+      };
+  }
+
   /**
    * Enqueue the TempletonControllerJob directly calling doAs.
    */
-  public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback,
-                     List<String> args)
+  public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, String callback,
+                     List<String> args, TempletonControllerJob controllerJob)
     throws NotAuthorizedException, BusyException,
     IOException, QueueException {
+    UserGroupInformation ugi = null;
     try {
-      UserGroupInformation ugi = UgiFactory.getUgi(user);
+      ugi = UgiFactory.getUgi(user);
 
       final long startTime = System.nanoTime();
 
-      String id = queueAsUser(ugi, args);
+      String id = queueAsUser(ugi, args, controllerJob);
 
       long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6));
       LOG.debug("queued job " + id + " in " + elapsed + " ms");
@@ -96,24 +199,91 @@ public class LauncherDelegator extends TempletonDelegator {
       return new EnqueueBean(id);
     } catch (InterruptedException e) {
       throw new QueueException("Unable to launch job " + e);
+    } finally {
+      if (ugi != null) {
+        FileSystem.closeAllForUGI(ugi);
+      }
     }
   }
 
-  private String queueAsUser(UserGroupInformation ugi, final List<String> args)
+  private String queueAsUser(UserGroupInformation ugi, final List<String> args,
+                            final TempletonControllerJob controllerJob)
     throws IOException, InterruptedException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Launching job: " + args);
     }
     return ugi.doAs(new PrivilegedExceptionAction<String>() {
       public String run() throws Exception {
-        String[] array = new String[args.size()];
-        TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf);
-        ToolRunner.run(ctrl, args.toArray(array));
-        return ctrl.getSubmittedId();
+        runTempletonControllerJob(controllerJob, args);
+        return controllerJob.getSubmittedId();
       }
     });
   }
 
+  /*
+   * Kills templeton job with multiple retries if job exists. Returns true if kill job
+   * attempt is success. Otherwise returns false.
+   */
+  private boolean killTempletonJobWithRetry(String user, String jobId) {
+    /*
+     * Make null safe Check if the job submission has gone through and if job is valid.
+     */
+    if (StringUtils.startsWith(jobId, "job_")) {
+      LOG.info("Started killing the job " + jobId);
+
+      boolean success = false;
+      int count = 0;
+      do {
+        try {
+          count++;
+          killJob(user, jobId);
+          success = true;
+          LOG.info("Kill job attempt succeeded.");
+         } catch (Exception e) {
+          LOG.info("Failed to kill the job due to exception: " + e.getMessage());
+          LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s before retrying "
+                       + "the operation. Iteration: " + count);
+          try {
+            Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000);
+          } catch (InterruptedException ex) {
+            LOG.info("Got interrupted while waiting for next retry.");
+          }
+        }
+      } while (!success && count < jobTimeoutTaskRetryCount);
+
+      return success;
+    } else {
+      LOG.info("Couldn't find a valid job id after job request is timed out.");
+      return false;
+    }
+  }
+
+  /*
+   * Gets new templeton controller objects.
+   */
+  protected TempletonControllerJob getTempletonController() {
+    return new TempletonControllerJob(secureMeatastoreAccess, appConf);
+  }
+
+  /*
+   * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run
+   * the actual job.
+   */
+  protected int runTempletonControllerJob(TempletonControllerJob controllerJob, List<String> args)
+    throws IOException, InterruptedException, TimeoutException, Exception {
+    String[] array = new String[args.size()];
+    return ToolRunner.run(controllerJob, args.toArray(array));
+  }
+
+  /*
+   * Uses DeleteDelegator to kill a job and ignores all exceptions.
+   */
+  protected void killJob(String user, String jobId)
+  throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+    DeleteDelegator d = new DeleteDelegator(appConf);
+    d.run(user, jobId);
+  }
+
   public List<String> makeLauncherArgs(AppConfig appConf, String statusdir,
                      String completedUrl,
                      List<String> copyFiles,
@@ -180,24 +350,35 @@ public class LauncherDelegator extends TempletonDelegator {
    */
   private String getShimLibjars() {
     WebHCatJTShim shim = null;
+    UserGroupInformation ugi = null;
     try {
-      shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, UserGroupInformation.getCurrentUser());
+      ugi = UserGroupInformation.getCurrentUser();
+      shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
+
+      // Besides the HiveShims jar which is Hadoop version dependent we also
+      // always need to include hive shims common jars.
+      Path shimCommonJar = new Path(
+          TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
+      Path shimCommonSecureJar = new Path(
+          TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
+      Path shimJar = new Path(
+          TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
+
+      return String.format(
+          "%s,%s,%s",
+          shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get WebHCatShim", e);
+      throw new RuntimeException("Failed to get shimLibJars", e);
+    } finally {
+      try {
+        if (ugi != null) {
+          FileSystem.closeAllForUGI(ugi);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to closeAllForUGI", e);
+      }
     }
 
-    // Besides the HiveShims jar which is Hadoop version dependent we also
-    // always need to include hive shims common jars.
-    Path shimCommonJar = new Path(
-        TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
-    Path shimCommonSecureJar = new Path(
-        TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
-    Path shimJar = new Path(
-        TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
-
-    return String.format(
-        "%s,%s,%s",
-        shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
   }
 
   // Storage vars
@@ -263,7 +444,7 @@ public class LauncherDelegator extends TempletonDelegator {
   }
   /**
    * This is called by subclasses when they determined that the sumbmitted job requires
-   * metastore access (e.g. Pig job that uses HCatalog).  This then determines if 
+   * metastore access (e.g. Pig job that uses HCatalog).  This then determines if
    * secure access is required and causes TempletonControllerJob to set up a delegation token.
    * @see TempletonControllerJob
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
index a30ecd1..dfa59f8 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
@@ -19,9 +19,15 @@
 package org.apache.hive.hcatalog.templeton;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobStatus;
@@ -31,20 +37,82 @@ import org.apache.hadoop.security.UserGroupInformation;
  * List jobs owned by a user.
  */
 public class ListDelegator extends TempletonDelegator {
+  private static final Log LOG = LogFactory.getLog(ListDelegator.class);
+  private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute";
+
+  /**
+   * Current thread id used to set in execution threads.
+   */
+  private final String listThreadId = Thread.currentThread().getName();
+
+  /*
+   * Job request executor to list job status requests.
+   */
+  private static JobRequestExecutor<List<JobItemBean>> jobRequest =
+                   new JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List,
+                   AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT);
+
   public ListDelegator(AppConfig appConf) {
     super(appConf);
   }
 
-  public List<String> run(String user, boolean showall)
+  /*
+   * List status jobs request. If maximum concurrent job list requests are configured then
+   * list request will be executed on a thread from thread pool. If job list request time out
+   * is configured then request execution thread will be interrupted if thread times out and
+   * does no action.
+   */
+  public List<JobItemBean> run(final String user, final boolean showall, final String jobId,
+                               final int numRecords, final boolean showDetails)
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException,
+           TimeoutException, ExecutionException, TooManyRequestsException {
+
+    if (jobRequest.isThreadPoolEnabled()) {
+      return jobRequest.execute(getJobListTask(user, showall, jobId,numRecords, showDetails));
+    } else {
+      return listJobs(user, showall, jobId, numRecords, showDetails);
+    }
+  }
+
+  /*
+   * Job callable task for job list operation. Overrides behavior of execute() to list jobs.
+   * No need to override behavior of cleanup() as there is nothing to be done if list jobs
+   * operation is timed out or interrupted.
+   */
+  private JobCallable<List<JobItemBean>> getJobListTask(final String user, final boolean showall,
+                      final String jobId, final int numRecords, final boolean showDetails) {
+    return new JobCallable<List<JobItemBean>>() {
+      @Override
+      public List<JobItemBean> execute() throws NotAuthorizedException, BadParam, IOException,
+                                             InterruptedException {
+       /*
+        * Change the current thread name to include parent thread Id if it is executed
+        * in thread pool. Useful to extract logs specific to a job request and helpful
+        * to debug job issues.
+        */
+        Thread.currentThread().setName(String.format("%s-%s-%s", JOB_LIST_EXECUTE_THREAD_PREFIX,
+                                       listThreadId, Thread.currentThread().getId()));
+
+        return listJobs(user, showall, jobId, numRecords, showDetails);
+      }
+    };
+  }
+
+  /*
+   * Gets list of job ids and calls getJobStatus to get status for each job id.
+   */
+  public List<JobItemBean> listJobs(String user, boolean showall, String jobId,
+                                    int numRecords, boolean showDetails)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException {
 
-    UserGroupInformation ugi = UgiFactory.getUgi(user);
+    UserGroupInformation ugi = null;
     WebHCatJTShim tracker = null;
+    ArrayList<String> ids = new ArrayList<String>();
+
     try {
+      ugi = UgiFactory.getUgi(user);
       tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
 
-      ArrayList<String> ids = new ArrayList<String>();
-
       JobStatus[] jobs = tracker.getAllJobs();
 
       if (jobs != null) {
@@ -54,13 +122,81 @@ public class ListDelegator extends TempletonDelegator {
             ids.add(id);
         }
       }
-
-      return ids;
     } catch (IllegalStateException e) {
       throw new BadParam(e.getMessage());
     } finally {
       if (tracker != null)
         tracker.close();
+      if (ugi != null)
+        FileSystem.closeAllForUGI(ugi);
     }
+
+    return getJobStatus(ids, user, showall, jobId, numRecords, showDetails);
+  }
+
+  /*
+   * Returns job status for list of input jobs as a list.
+   */
+  public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, boolean showall,
+                                       String jobId, int numRecords, boolean showDetails)
+                                       throws IOException, InterruptedException {
+
+    List<JobItemBean> detailList = new ArrayList<JobItemBean>();
+    int currRecord = 0;
+
+    // Sort the list as requested
+    boolean isAscendingOrder = true;
+    switch (appConf.getListJobsOrder()) {
+    case lexicographicaldesc:
+      Collections.sort(jobIds, Collections.reverseOrder());
+      isAscendingOrder = false;
+      break;
+    case lexicographicalasc:
+    default:
+      Collections.sort(jobIds);
+      break;
+    }
+
+    for (String job : jobIds) {
+      // If numRecords = -1, fetch all records.
+      // Hence skip all the below checks when numRecords = -1.
+      if (numRecords != -1) {
+        // If currRecord >= numRecords, we have already fetched the top #numRecords
+        if (currRecord >= numRecords) {
+          break;
+        }
+        else if (jobId == null || jobId.trim().length() == 0) {
+            currRecord++;
+        }
+        // If the current record needs to be returned based on the
+        // filter conditions specified by the user, increment the counter
+        else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) {
+          currRecord++;
+        }
+        // The current record should not be included in the output detailList.
+        else {
+          continue;
+        }
+      }
+      JobItemBean jobItem = new JobItemBean();
+      jobItem.id = job;
+      if (showDetails) {
+        StatusDelegator sd = new StatusDelegator(appConf);
+        try {
+          jobItem.detail = sd.run(user, job, false);
+        }
+        catch(Exception ex) {
+          /*
+           * if we could not get status for some reason, log it, and send empty status back with
+           * just the ID so that caller knows to even look in the log file
+           */
+          LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
+          jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
+        }
+      }
+      detailList.add(jobItem);
+    }
+
+    return detailList;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
index 5208bf4..3ed3ece 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 
 import org.slf4j.Logger;
@@ -43,14 +44,15 @@ import org.eclipse.jetty.rewrite.handler.RedirectPatternRule;
 import org.eclipse.jetty.rewrite.handler.RewriteHandler;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.FilterHolder;
-import org.eclipse.jetty.servlet.FilterMapping;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.xml.XmlConfiguration;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
+import javax.servlet.DispatcherType;
 import javax.servlet.http.HttpServletRequest;
 
 /**
@@ -122,7 +124,7 @@ public class Main {
       checkEnv();
       runServer(port);
       // Currently only print the first port to be consistent with old behavior
-      port =  ArrayUtils.isEmpty(server.getConnectors()) ? -1 : server.getConnectors()[0].getPort();
+      port =  ArrayUtils.isEmpty(server.getConnectors()) ? -1 : ((ServerConnector)(server.getConnectors()[0])).getLocalPort();
 
       System.out.println("templeton: listening on port " + port);
       LOG.info("Templeton listening on port " + port);
@@ -185,6 +187,7 @@ public class Main {
 
     // Add the Auth filter
     FilterHolder fHolder = makeAuthFilter();
+    EnumSet<DispatcherType> dispatches = EnumSet.of(DispatcherType.REQUEST);
 
     /* 
      * We add filters for each of the URIs supported by templeton.
@@ -193,28 +196,18 @@ public class Main {
      * This is because mapreduce does not use secure credentials for 
      * callbacks. So jetty would fail the request as unauthorized.
      */ 
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", 
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", 
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", 
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*",
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", 
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*",
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", 
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", 
-             FilterMapping.REQUEST);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", 
-             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", dispatches);
 
     if (conf.getBoolean(AppConfig.XSRF_FILTER_ENABLED, false)){
-      root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*",
-             FilterMapping.REQUEST);
+      root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*", dispatches);
       LOG.debug("XSRF filter enabled");
     } else {
       LOG.warn("XSRF filter disabled");