You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:43:32 UTC

[42/51] [partial] hive git commit: Revert "HIVE-14671 : merge master into hive-14535 (Wei Zheng)"

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 6f96e1d..f7e3e3a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.RawStoreProxy;
@@ -57,7 +56,6 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
-import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -88,17 +86,23 @@ public class DbNotificationListener extends MetaStoreEventListener {
   // HiveConf rather than a Configuration.
   private HiveConf hiveConf;
   private MessageFactory msgFactory;
-
-  private synchronized void init(HiveConf conf) throws MetaException {
-    if (cleaner == null) {
-      cleaner =
-          new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf,
-              conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999));
+  private RawStore rs;
+
+  private synchronized void init(HiveConf conf) {
+    try {
+      rs = RawStoreProxy.getProxy(conf, conf,
+          conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999);
+    } catch (MetaException e) {
+      LOG.error("Unable to connect to raw store, notifications will not be tracked", e);
+      rs = null;
+    }
+    if (cleaner == null && rs != null) {
+      cleaner = new CleanerThread(conf, rs);
       cleaner.start();
     }
   }
 
-  public DbNotificationListener(Configuration config) throws MetaException {
+  public DbNotificationListener(Configuration config) {
     super(config);
     // The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor
     // with a Configuration parameter, so we have to declare config as Configuration.  But it
@@ -138,7 +142,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event, tableEvent);
+    process(event);
   }
 
   /**
@@ -153,7 +157,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event, tableEvent);
+    process(event);
   }
 
   /**
@@ -166,10 +170,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
     Table after = tableEvent.getNewTable();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
-            .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString());
+            .buildAlterTableMessage(before, after).toString());
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
-    process(event, tableEvent);
+    process(event);
   }
 
   class FileIterator implements Iterator<String> {
@@ -277,7 +281,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event, partitionEvent);
+    process(event);
   }
 
   /**
@@ -292,7 +296,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event, partitionEvent);
+    process(event);
   }
 
   /**
@@ -305,10 +309,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
     Partition after = partitionEvent.getNewPartition();
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
-            .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString());
+            .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
-    process(event, partitionEvent);
+    process(event);
   }
 
   /**
@@ -322,7 +326,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
             .buildCreateDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event, dbEvent);
+    process(event);
   }
 
   /**
@@ -336,7 +340,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
             .buildDropDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event, dbEvent);
+    process(event);
   }
 
   /**
@@ -350,7 +354,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
             .buildCreateFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event, fnEvent);
+    process(event);
   }
 
   /**
@@ -364,7 +368,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
             .buildDropFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event, fnEvent);
+    process(event);
   }
 
   /**
@@ -378,7 +382,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
             .buildCreateIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event, indexEvent);
+    process(event);
   }
 
   /**
@@ -392,7 +396,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
             .buildDropIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event, indexEvent);
+    process(event);
   }
 
   /**
@@ -407,7 +411,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
             .buildAlterIndexMessage(before, after).toString());
     event.setDbName(before.getDbName());
-    process(event, indexEvent);
+    process(event);
   }
 
   class FileChksumIterator implements Iterator<String> {
@@ -439,12 +443,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
   public void onInsert(InsertEvent insertEvent) throws MetaException {
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
-            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
+            insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
             new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
             .toString());
     event.setDbName(insertEvent.getDb());
     event.setTableName(insertEvent.getTable());
-    process(event, insertEvent);
+    process(event);
   }
 
   /**
@@ -468,27 +472,18 @@ public class DbNotificationListener extends MetaStoreEventListener {
     return (int)millis;
   }
 
-  /**
-   * Process this notification by adding it to metastore DB.
-   *
-   * @param event NotificationEvent is the object written to the metastore DB.
-   * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
-   *                      DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
-   */
-  private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
+  // Process this notification by adding it to metastore DB
+  private void process(NotificationEvent event) {
     event.setMessageFormat(msgFactory.getMessageFormat());
-    synchronized (NOTIFICATION_TBL_LOCK) {
-      LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
-          event.getMessage());
-      HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
-    }
-
-      // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
-      if (event.isSetEventId()) {
-        listenerEvent.putParameter(
-            MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
-            Long.toString(event.getEventId()));
+    if (rs != null) {
+      synchronized (NOTIFICATION_TBL_LOCK) {
+        LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
+            event.getMessage());
+        rs.addNotificationEvent(event);
       }
+    } else {
+      LOG.warn("Dropping event " + event + " since notification is not running.");
+    }
   }
 
   private static class CleanerThread extends Thread {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
deleted file mode 100644
index a4f2d59..0000000
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.hcatalog.listener;
-
-/**
- * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent
- * parameters.
- */
-public class MetaStoreEventListenerConstants {
-  /*
-   * DbNotificationListener keys reserved for updating ListenerEvent parameters.
-   *
-   * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener
-   *                                   processed during an event. This event identifier might be shared
-   *                                   across other MetaStoreEventListener implementations.
-   */
-  public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME";
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index 5bea0a6..e765305 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.hive.hcatalog</groupId>
     <artifactId>hive-hcatalog</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
deleted file mode 100644
index 78987ab..0000000
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.hcatalog.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.RegexSerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.Text;
-
-/**
- * Streaming Writer handles text input data with regex. Uses
- * org.apache.hadoop.hive.serde2.RegexSerDe
- */
-public class StrictRegexWriter extends AbstractRecordWriter {
-  private RegexSerDe serde;
-  private final StructObjectInspector recordObjInspector;
-  private final ObjectInspector[] bucketObjInspectors;
-  private final StructField[] bucketStructFields;
-  
-  /**
-   * @param endPoint the end point to write to
-   * @param conn connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
-          throws ConnectionError, SerializationError, StreamingException {
-    this(null, endPoint, null, conn);
-  }
-  
-  /**
-   * @param endPoint the end point to write to
-   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
-   * @param conn connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
-          throws ConnectionError, SerializationError, StreamingException {
-    this(null, endPoint, conf, conn);
-  }
-  
-  /**
-   * @param regex to parse the data
-   * @param endPoint the end point to write to
-   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
-   * @param conn connection this Writer is to be used with
-   * @throws ConnectionError
-   * @throws SerializationError
-   * @throws StreamingException
-   */
-  public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
-          throws ConnectionError, SerializationError, StreamingException {
-    super(endPoint, conf, conn);
-    this.serde = createSerde(tbl, conf, regex);
-    // get ObjInspectors for entire record and bucketed cols
-    try {
-      recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector();
-      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
-    } catch (SerDeException e) {
-      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
-    }
-
-    // get StructFields for bucketed cols
-    bucketStructFields = new StructField[bucketIds.size()];
-    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
-    for (int i = 0; i < bucketIds.size(); i++) {
-      bucketStructFields[i] = allFields.get(bucketIds.get(i));
-    }
-  }
-  
-  @Override
-  public AbstractSerDe getSerde() {
-    return serde;
-  }
-
-  @Override
-  protected StructObjectInspector getRecordObjectInspector() {
-    return recordObjInspector;
-  }
-
-  @Override
-  protected StructField[] getBucketStructFields() {
-    return bucketStructFields;
-  }
-
-  @Override
-  protected ObjectInspector[] getBucketObjectInspectors() {
-    return bucketObjInspectors;
-  }
-
-
-  @Override
-  public void write(long transactionId, byte[] record)
-          throws StreamingIOFailure, SerializationError {
-    try {
-      Object encodedRow = encode(record);
-      int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
-    } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction("
-              + transactionId + ")", e);
-    }
-  }
-
-  /**
-   * Creates RegexSerDe
-   * @param tbl   used to create serde
-   * @param conf  used to create serde
-   * @param regex  used to create serde
-   * @return
-   * @throws SerializationError if serde could not be initialized
-   */
-  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
-          throws SerializationError {
-    try {
-      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
-      tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
-      ArrayList<String> tableColumns = getCols(tbl);
-      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
-      RegexSerDe serde = new RegexSerDe();
-      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
-      return serde;
-    } catch (SerDeException e) {
-      throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
-    }
-  }
-  
-  private static ArrayList<String> getCols(Table table) {
-    List<FieldSchema> cols = table.getSd().getCols();
-    ArrayList<String> colNames = new ArrayList<String>(cols.size());
-    for (FieldSchema col : cols) {
-      colNames.add(col.getName().toLowerCase());
-    }
-    return colNames;
-  }
-
-  /**
-   * Encode Utf8 encoded string bytes using RegexSerDe
-   * 
-   * @param utf8StrRecord
-   * @return The encoded object
-   * @throws SerializationError
-   */
-  @Override
-  public Object encode(byte[] utf8StrRecord) throws SerializationError {
-    try {
-      Text blob = new Text(utf8StrRecord);
-      return serde.deserialize(blob);
-    } catch (SerDeException e) {
-      throw new SerializationError("Unable to convert byte[] record into Object", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 097de9b..bf29993 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -64,6 +64,10 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -78,15 +82,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
-import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.tools.FileDump;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
@@ -485,9 +485,9 @@ public class TestStreaming {
 
     NullWritable key = rr.createKey();
     OrcStruct value = rr.createValue();
-    for (String record : records) {
+    for (int i = 0; i < records.length; i++) {
       Assert.assertEquals(true, rr.next(key, value));
-      Assert.assertEquals(record, value.toString());
+      Assert.assertEquals(records[i], value.toString());
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
@@ -741,7 +741,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -787,75 +787,6 @@ public class TestStreaming {
   }
 
   @Test
-  public void testTransactionBatchCommit_Regex() throws Exception {
-    testTransactionBatchCommit_Regex(null);
-  }
-  @Test
-  public void testTransactionBatchCommit_RegexUGI() throws Exception {
-    testTransactionBatchCommit_Regex(Utils.getUGI());
-  }
-  private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-      partitionVals);
-    StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
-    String regex = "([^,]*),(.*)";
-    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
-
-    // 1st Txn
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.commit();
-
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
-
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-      , txnBatch.getCurrentTransactionState());
-
-    // 2nd Txn
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("2,Welcome to streaming".getBytes());
-
-    // data should not be visible
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
-
-    txnBatch.commit();
-
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
-      "{2, Welcome to streaming}");
-
-    txnBatch.close();
-    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-      , txnBatch.getCurrentTransactionState());
-
-
-    connection.close();
-
-
-    // To Unpartitioned table
-    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
-    regex = "([^:]*):(.*)";
-    writer = new StrictRegexWriter(regex, endPt, conf, connection);
-
-    // 1st Txn
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("1:Hello streaming".getBytes());
-    txnBatch.commit();
-
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-      , txnBatch.getCurrentTransactionState());
-    connection.close();
-  }
-  
-  @Test
   public void testTransactionBatchCommit_Json() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
@@ -871,7 +802,7 @@ public class TestStreaming {
     txnBatch.write(rec1.getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -998,7 +929,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1017,13 +948,13 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1034,14 +965,14 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
             "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
             "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
             "{4, Welcome to streaming - once again}");
 
@@ -1078,11 +1009,11 @@ public class TestStreaming {
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.beginNextTransaction();
     txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1090,17 +1021,17 @@ public class TestStreaming {
     txnBatch2.beginNextTransaction();
     txnBatch2.write("4,Welcome to streaming - once again".getBytes());
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}");
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
         "{2, Welcome to streaming}",
         "{3, Hello streaming - once again}",
         "{4, Welcome to streaming - once again}");
@@ -1769,7 +1700,7 @@ public class TestStreaming {
     txnBatch.heartbeat();//this is no-op on closed batch
     txnBatch.abort();//ditto
     GetOpenTxnsInfoResponse r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
     List<TxnInfo> ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
     Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1833,7 +1764,7 @@ public class TestStreaming {
       expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
 
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
     Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1856,7 +1787,7 @@ public class TestStreaming {
       expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
     
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
     Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/java-client/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml
index 3bb9f4d..3b53664 100644
--- a/hcatalog/webhcat/java-client/pom.xml
+++ b/hcatalog/webhcat/java-client/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.hive.hcatalog</groupId>
     <artifactId>hive-hcatalog</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
index 86d3acb..b9cb067 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
@@ -434,7 +434,7 @@ public class TestHCatClient {
     HCatClient client = HCatClient.create(new Configuration(hcatConf));
     boolean isExceptionCaught = false;
     // Table creation with a long table name causes ConnectionFailureException
-    final String tableName = "Temptable" + new BigInteger(260, new Random()).toString(2);
+    final String tableName = "Temptable" + new BigInteger(200, new Random()).toString(2);
 
     ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
     cols.add(new HCatFieldSchema("id", Type.INT, "id columns"));

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml
index a55ffe9..c5ad387 100644
--- a/hcatalog/webhcat/svr/pom.xml
+++ b/hcatalog/webhcat/svr/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.hive.hcatalog</groupId>
     <artifactId>hive-hcatalog</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
@@ -45,50 +45,9 @@
       <artifactId>hive-hcatalog-core</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.eclipse.jetty</groupId>
-          <artifactId>jetty-runner</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-sslengine</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-	  <artifactId>jetty-util</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-	  <artifactId>jsp-2.1</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-	  <artifactId>jsp-api-2.1</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <!-- inter-project -->
     <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-rewrite</artifactId>
-      <version>${jetty.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-server</artifactId>
-      <version>${jetty.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlet</artifactId>
-      <version>${jetty.version}</version>
-    </dependency>
-    <dependency>
       <groupId>com.sun.jersey</groupId>
       <artifactId>jersey-core</artifactId>
       <version>${jersey.version}</version>
@@ -134,6 +93,11 @@
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all-server</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jul-to-slf4j</artifactId>
       <version>${slf4j.version}</version>
@@ -143,7 +107,7 @@
       <artifactId>hadoop-auth</artifactId>
       <version>${hadoop.version}</version>
         <exclusions>
-          <exclusion>
+             <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
           </exclusion>
@@ -157,42 +121,16 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-	</exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <version>${hadoop.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <version>${hadoop.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <!-- test inter-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 0ea7d88..54d0907 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -111,43 +111,6 @@ public class AppConfig extends Configuration {
   public static final String MR_AM_MEMORY_MB     = "templeton.mr.am.memory.mb";
   public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder";
 
-  /*
-   * These parameters controls the maximum number of concurrent job submit/status/list
-   * operations in templeton service. If more number of concurrent requests comes then
-   * they will be rejected with BusyException.
-   */
-  public static final String JOB_SUBMIT_MAX_THREADS = "templeton.parallellism.job.submit";
-  public static final String JOB_STATUS_MAX_THREADS = "templeton.parallellism.job.status";
-  public static final String JOB_LIST_MAX_THREADS = "templeton.parallellism.job.list";
-
-  /*
-   * These parameters controls the maximum time job submit/status/list operation is
-   * executed in templeton service. On time out, the execution is interrupted and
-   * TimeoutException is returned to client. On time out
-   *   For list and status operation, there is no action needed as they are read requests.
-   *   For submit operation, we do best effort to kill the job if its generated. Enabling
-   *     this parameter may have following side effects
-   *     1) There is a possibility for having active job for some time when the client gets
-   *        response for submit operation and a list operation from client could potential
-   *        show the newly created job which may eventually be killed with no guarantees.
-   *     2) If submit operation retried by client then there is a possibility of duplicate
-   *        jobs triggered.
-   *
-   * Time out configs should be configured in seconds.
-   *
-   */
-  public static final String JOB_SUBMIT_TIMEOUT   = "templeton.job.submit.timeout";
-  public static final String JOB_STATUS_TIMEOUT   = "templeton.job.status.timeout";
-  public static final String JOB_LIST_TIMEOUT   = "templeton.job.list.timeout";
-
-  /*
-   * If task execution time out is configured for submit operation then job may need to
-   * be killed on execution time out. These parameters controls the maximum number of
-   * retries and retry wait time in seconds for executing the time out task.
-   */
-  public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = "templeton.job.timeout.task.retry.count";
-  public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = "templeton.job.timeout.task.retry.interval";
-
   /**
    * see webhcat-default.xml
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
index 622f92d..4b2dfec 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -42,11 +41,10 @@ public class DeleteDelegator extends TempletonDelegator {
   public QueueStatusBean run(String user, String id)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException
   {
-    UserGroupInformation ugi = null;
+    UserGroupInformation ugi = UgiFactory.getUgi(user);
     WebHCatJTShim tracker = null;
     JobState state = null;
     try {
-      ugi = UgiFactory.getUgi(user);
       tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
       JobID jobid = StatusDelegator.StringToJobID(id);
       if (jobid == null)
@@ -71,8 +69,6 @@ public class DeleteDelegator extends TempletonDelegator {
         tracker.close();
       if (state != null)
         state.close();
-      if (ugi != null)
-        FileSystem.closeAllForUGI(ugi);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index 1953028..f0296cb 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator {
                String statusdir, String callback, String completedUrl, boolean enablelog,
                Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException, TooManyRequestsException
+    ExecuteException, IOException, InterruptedException
   {
     runAs = user;
     List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir,

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index 1246b40..84cd5b9 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator {
                boolean usesHcatalog, String completedUrl,
                boolean enablelog, Boolean enableJobReconnect, JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+    ExecuteException, IOException, InterruptedException {
     runAs = user;
     List<String> args = makeArgs(jar, mainClass,
       libjars, files, jarArgs, defines,

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
deleted file mode 100644
index e703eff..0000000
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.util.concurrent.Callable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class JobCallable<T> implements Callable<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class);
-
-  static public enum JobState {
-    STARTED,
-    FAILED,
-    COMPLETED
-  }
-
-  /*
-   * Job state of job request. Changes to the state are synchronized using
-   * setStateAndResult. This is required due to two different threads,
-   * main thread and job execute thread, tries to change state and organize
-   * clean up tasks.
-   */
-  private JobState jobState = JobState.STARTED;
-
-  /*
-   * Result of JobCallable task after successful task completion. This is
-   * expected to be set by the thread which executes JobCallable task.
-   */
-  public T returnResult = null;
-
-  /*
-   * Sets the job state to FAILED. Returns true if FAILED status is set.
-   * Otherwise, it returns false.
-   */
-  public boolean setJobStateFailed() {
-    return setStateAndResult(JobState.FAILED, null);
-  }
-
-  /*
-   * Sets the job state to COMPLETED and also sets the results value. Returns true
-   * if COMPLETED status is set. Otherwise, it returns false.
-   */
-  public boolean setJobStateCompleted(T result) {
-    return setStateAndResult(JobState.COMPLETED, result);
-  }
-
-  /*
-   * Sets the job state and result. Returns true if status and result are set.
-   * Otherwise, it returns false.
-   */
-  private synchronized boolean setStateAndResult(JobState jobState, T result) {
-    if (this.jobState == JobState.STARTED) {
-      this.jobState = jobState;
-      this.returnResult = result;
-      return true;
-    } else {
-      LOG.info("Failed to set job state to " + jobState + " due to job state "
-                  + this.jobState + ". Expected state is " + JobState.STARTED);
-    }
-
-    return false;
-  }
-
-  /*
-   * Executes the callable task with help of execute() call and gets the result
-   * of the task. It also sets job status as COMPLETED if state is not already
-   * set to FAILED and returns result to future.
-   */
-  public T call() throws Exception {
-
-    /*
-     * Don't catch any execution exceptions here and let the caller catch it.
-     */
-    T result = this.execute();
-
-    if (!this.setJobStateCompleted(result)) {
-     /*
-      * Failed to set job status as COMPLETED which mean the main thread would have
-      * exited and not waiting for the result. Call cleanup() to execute any cleanup.
-      */
-      cleanup();
-      return null;
-    }
-
-    return this.returnResult;
-  }
-
-  /*
-   * Abstract method to be overridden for task execution.
-   */
-  public abstract T execute() throws Exception;
-
-  /*
-   * Cleanup method called to run cleanup tasks if job state is FAILED. By default,
-   * no cleanup is provided.
-   */
-  public void cleanup() {}
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
deleted file mode 100644
index 9ac4588..0000000
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JobRequestExecutor<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class);
-  private static AppConfig appConf = Main.getAppConfigInstance();
-
-  /*
-   * Thread pool to execute job requests.
-   */
-  private ThreadPoolExecutor jobExecutePool = null;
-
-  /*
-   * Type of job request.
-   */
-  private JobRequestType requestType;
-
-  /*
-   * Config name used to find the number of concurrent requests.
-   */
-  private String concurrentRequestsConfigName;
-
-  /*
-   * Config name used to find the maximum time job request can be executed.
-   */
-  private String jobTimeoutConfigName;
-
-  /*
-   * Job request execution time out in seconds. If it is 0 then request
-   * will not be timed out.
-   */
-  private int requestExecutionTimeoutInSec = 0;
-
-  /*
-   * Amount of time a thread can be alive in thread pool before cleaning this up. Core threads
-   * will not be cleanup from thread pool.
-   */
-  private int threadKeepAliveTimeInHours = 1;
-
-  /*
-   * Maximum number of times a cancel request is sent to job request execution
-   * thread. Future.cancel may not be able to interrupt the thread if it is
-   * blocked on network calls.
-   */
-  private int maxTaskCancelRetryCount = 10;
-
-  /*
-   * Wait time in milliseconds before another cancel request is made.
-   */
-  private int maxTaskCancelRetryWaitTimeInMs = 1000;
-
-  /*
-   * A flag to indicate whether to cancel the task when exception TimeoutException or
-   * InterruptedException or CancellationException raised. The default is cancel thread.
-   */
-  private boolean enableCancelTask = true;
-
-  /*
-   * Job Request type.
-   */
-  public enum JobRequestType {
-    Submit,
-    Status,
-    List
-  }
-
-  /*
-   * Creates a job request object and sets up execution environment. Creates a thread pool
-   * to execute job requests.
-   *
-   * @param requestType
-   *          Job request type
-   *
-   * @param concurrentRequestsConfigName
-   *          Config name to be used to extract number of concurrent requests to be serviced.
-   *
-   * @param jobTimeoutConfigName
-   *          Config name to be used to extract maximum time a task can execute a request.
-   *
-   * @param enableCancelTask
-   *          A flag to indicate whether to cancel the task when exception TimeoutException
-   *          or InterruptedException or CancellationException raised.
-   *
-   */
-  public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
-                            String jobTimeoutConfigName, boolean enableCancelTask) {
-
-    this.concurrentRequestsConfigName = concurrentRequestsConfigName;
-    this.jobTimeoutConfigName = jobTimeoutConfigName;
-    this.requestType = requestType;
-    this.enableCancelTask = enableCancelTask;
-
-    /*
-     * The default number of threads will be 0. That means thread pool is not used and
-     * operation is executed with the current thread.
-     */
-    int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ?
-                                appConf.getInt(concurrentRequestsConfigName, 0) : 0;
-
-    if (threads > 0) {
-      /*
-       * Create a thread pool with no queue wait time to execute the operation. This will ensure
-       * that job requests are rejected if there are already maximum number of threads busy.
-       */
-      this.jobExecutePool = new ThreadPoolExecutor(threads, threads,
-                             threadKeepAliveTimeInHours, TimeUnit.HOURS,
-                             new SynchronousQueue<Runnable>());
-       this.jobExecutePool.allowCoreThreadTimeOut(true);
-
-      /*
-       * Get the job request time out value. If this configuration value is set to 0
-       * then job request will wait until it finishes.
-       */
-      if (!StringUtils.isEmpty(jobTimeoutConfigName)) {
-        this.requestExecutionTimeoutInSec = appConf.getInt(jobTimeoutConfigName, 0);
-      }
-
-      LOG.info("Configured " + threads + " threads for job request type " + this.requestType
-                 + " with time out " + this.requestExecutionTimeoutInSec + " s.");
-    } else {
-      /*
-       * If threads are not configured then they will be executed in current thread itself.
-       */
-      LOG.info("No thread pool configured for job request type " + this.requestType);
-    }
-  }
-
-  /*
-   * Creates a job request object and sets up execution environment. Creates a thread pool
-   * to execute job requests.
-   *
-   * @param requestType
-   *          Job request type
-   *
-   * @param concurrentRequestsConfigName
-   *          Config name to be used to extract number of concurrent requests to be serviced.
-   *
-   * @param jobTimeoutConfigName
-   *          Config name to be used to extract maximum time a task can execute a request.
-   *
-   */
-  public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
-                            String jobTimeoutConfigName) {
-    this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, true);
-  }
-
-  /*
-   * Returns true of thread pool is created and can be used for executing a job request.
-   * Otherwise, returns false.
-   */
-  public boolean isThreadPoolEnabled() {
-    return this.jobExecutePool != null;
-  }
-
-  /*
-   * Executes job request operation. If thread pool is not created then job request is
-   * executed in current thread itself.
-   *
-   * @param jobExecuteCallable
-   *          Callable object to run the job request task.
-   *
-   */
-  public T execute(JobCallable<T> jobExecuteCallable) throws InterruptedException,
-                 TimeoutException, TooManyRequestsException, ExecutionException {
-    /*
-     * The callable shouldn't be null to execute. The thread pool also should be configured
-     * to execute requests.
-     */
-    assert (jobExecuteCallable != null);
-    assert (this.jobExecutePool != null);
-
-    String type = this.requestType.toString().toLowerCase();
-
-    String retryMessageForConcurrentRequests = "Please wait for some time before retrying "
-                  + "the operation. Please refer to the config " + concurrentRequestsConfigName
-                  + " to configure concurrent requests.";
-
-    LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeoutInSec
-              + "seconds.");
-    Future<T> future = null;
-
-    try {
-      future = this.jobExecutePool.submit(jobExecuteCallable);
-    } catch (RejectedExecutionException rejectedException) {
-      /*
-       * Not able to find thread to execute the job request. Raise Busy exception and client
-       * can retry the operation.
-       */
-      String tooManyRequestsExceptionMessage = "Unable to service the " + type + " job request as "
-                        + "templeton service is busy with too many " + type + " job requests. "
-                        + retryMessageForConcurrentRequests;
-
-      LOG.warn(tooManyRequestsExceptionMessage);
-      throw new TooManyRequestsException(tooManyRequestsExceptionMessage);
-    }
-
-    T result = null;
-
-    try {
-      result = this.requestExecutionTimeoutInSec > 0
-                ? future.get(this.requestExecutionTimeoutInSec, TimeUnit.SECONDS) : future.get();
-    } catch (TimeoutException e) {
-      /*
-       * See if the execution thread has just completed operation and result is available.
-       * If result is available then return the result. Otherwise, raise exception.
-       */
-      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
-        String message = this.requestType + " job request got timed out. Please wait for some time "
-                       + "before retrying the operation. Please refer to the config "
-                       + jobTimeoutConfigName + " to configure job request time out.";
-        LOG.warn(message);
-
-        /*
-         * Throw TimeoutException to caller.
-         */
-        throw new TimeoutException(message);
-      }
-    } catch (InterruptedException e) {
-      /*
-       * See if the execution thread has just completed operation and result is available.
-       * If result is available then return the result. Otherwise, raise exception.
-       */
-      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
-        String message = this.requestType + " job request got interrupted. Please wait for some time "
-                       + "before retrying the operation.";
-        LOG.warn(message);
-
-        /*
-         * Throw TimeoutException to caller.
-         */
-        throw new InterruptedException(message);
-      }
-    } catch (CancellationException e) {
-      /*
-       * See if the execution thread has just completed operation and result is available.
-       * If result is available then return the result. Otherwise, raise exception.
-       */
-      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
-        String message = this.requestType + " job request got cancelled and thread got interrupted. "
-                       + "Please wait for some time before retrying the operation.";
-        LOG.warn(message);
-
-        throw new InterruptedException(message);
-      }
-    } finally {
-      /*
-       * If the thread is still active and needs to be cancelled then cancel it. This may
-       * happen in case task got interrupted, or timed out.
-       */
-      if (enableCancelTask) {
-        cancelExecutePoolThread(future);
-      }
-    }
-
-    LOG.debug("Completed " + type + " job request.");
-
-    return result;
-  }
-
-  /*
-   * Initiate cancel request to cancel the thread execution and interrupt the thread.
-   * If thread interruption is not handled by jobExecuteCallable then thread may continue
-   * running to completion. The cancel call may fail for some scenarios. In that case,
-   * retry the cancel call until it returns true or max retry count is reached.
-   *
-   * @param future
-   *          Future object which has handle to cancel the thread.
-   *
-   */
-  private void cancelExecutePoolThread(Future<T> future) {
-    int retryCount = 0;
-    while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) {
-      LOG.info("Task is still executing the job request. Cancelling it with retry count: "
-               + retryCount);
-      if (future.cancel(true)) {
-        /*
-         * Cancelled the job request and return to client.
-         */
-        LOG.info("Cancel job request issued successfully.");
-        return;
-      }
-
-      retryCount++;
-      try {
-        Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs);
-      } catch (InterruptedException e) {
-        /*
-         * Nothing to do. Just retry.
-         */
-      }
-    }
-
-    LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled()
-                    + " Retry count: " + retryCount);
-  }
-
-  /*
-   * Tries to get the job result if job request is completed. Otherwise it sets job status
-   * to FAILED such that execute thread can do necessary clean up based on FAILED state.
-   */
-  private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> jobExecuteCallable) {
-    if (!jobExecuteCallable.setJobStateFailed()) {
-      LOG.info("Job is already COMPLETED. Returning the result.");
-      return jobExecuteCallable.returnResult;
-    } else {
-      LOG.info("Job status set to FAILED. Job clean up to be done by execute thread "
-              + "after job request is executed.");
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index 9bea897..b3f44a2 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -23,19 +23,16 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.templeton.tool.JobState;
 import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
@@ -53,26 +50,9 @@ public class LauncherDelegator extends TempletonDelegator {
   static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP}
   private boolean secureMeatastoreAccess = false;
   private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*";
-  private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute";
-  private final int jobTimeoutTaskRetryCount;
-  private final int jobTimeoutTaskRetryIntervalInSec;
-
-  /**
-   * Current thread used to set in execution threads.
-   */
-  private final String submitThreadId = Thread.currentThread().getName();
-
-  /**
-   * Job request executor to submit job requests.
-   */
-  private static JobRequestExecutor<EnqueueBean> jobRequest =
-                   new JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit,
-                   AppConfig.JOB_SUBMIT_MAX_THREADS, AppConfig.JOB_SUBMIT_TIMEOUT, false);
 
   public LauncherDelegator(AppConfig appConf) {
     super(appConf);
-    jobTimeoutTaskRetryCount = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0);
-    jobTimeoutTaskRetryIntervalInSec = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0);
   }
 
   public void registerJob(String id, String user, String callback,
@@ -90,102 +70,19 @@ public class LauncherDelegator extends TempletonDelegator {
     }
   }
 
-  /*
-   * Submit job request. If maximum concurrent job submit requests are configured then submit
-   * request will be executed on a thread from thread pool. If job submit request time out is
-   * configured then request execution thread will be interrupted if thread times out. Also
-   * does best efforts to identify if job is submitted and kill it quietly.
-   */
-  public EnqueueBean enqueueController(final String user, final Map<String, Object> userArgs,
-                     final String callback, final List<String> args)
-    throws NotAuthorizedException, BusyException, IOException, QueueException, TooManyRequestsException {
-
-    EnqueueBean bean = null;
-    final TempletonControllerJob controllerJob = getTempletonController();
-
-    if (jobRequest.isThreadPoolEnabled()) {
-      JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, userArgs, callback,
-                                                                     args, controllerJob);
-      try {
-        bean = jobRequest.execute(jobExecuteCallable);
-      } catch (TimeoutException ex) {
-       /*
-        * Job request got timed out. Job kill should have started. Return to client with
-        * QueueException.
-        */
-        throw new QueueException(ex.getMessage());
-      } catch (InterruptedException ex) {
-       /*
-        * Job request got interrupted. Job kill should have started. Return to client with
-        * with QueueException.
-        */
-        throw new QueueException(ex.getMessage());
-      } catch (ExecutionException ex) {
-        /*
-         * ExecutionException is raised if job execution gets an exception. Return to client
-         * with the exception.
-         */
-        throw new QueueException(ex.getMessage());
-      }
-    } else {
-      LOG.info("No thread pool configured for submit job request. Executing "
-                      + "the job request in current thread.");
-
-      bean = enqueueJob(user, userArgs, callback, args, controllerJob);
-    }
-
-    return bean;
-  }
-
-  /*
-   * Job callable task for job submit operation. Overrides behavior of execute()
-   * to submit job. Also, overrides the behavior of cleanup() to kill the job in case
-   * job submission request is timed out or interrupted.
-   */
-  private JobCallable<EnqueueBean> getJobSubmitTask(final String user,
-                     final Map<String, Object> userArgs, final String callback,
-                     final List<String> args, final TempletonControllerJob controllerJob) {
-      return new JobCallable<EnqueueBean>() {
-        @Override
-        public EnqueueBean execute() throws NotAuthorizedException, BusyException, IOException,
-                                       QueueException {
-         /*
-          * Change the current thread name to include parent thread Id if it is executed
-          * in thread pool. Useful to extract logs specific to a job request and helpful
-          * to debug job issues.
-          */
-          Thread.currentThread().setName(String.format("%s-%s-%s", JOB_SUBMIT_EXECUTE_THREAD_PREFIX,
-                                       submitThreadId, Thread.currentThread().getId()));
-
-          return enqueueJob(user, userArgs, callback, args, controllerJob);
-        }
-
-        @Override
-        public void cleanup() {
-          /*
-           * Failed to set job status as COMPLETED which mean the main thread would have
-           * exited and not waiting for the result. Kill the submitted job.
-           */
-          LOG.info("Job kill not done by main thread. Trying to kill now.");
-          killTempletonJobWithRetry(user, controllerJob.getSubmittedId());
-        }
-      };
-  }
-
   /**
    * Enqueue the TempletonControllerJob directly calling doAs.
    */
-  public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, String callback,
-                     List<String> args, TempletonControllerJob controllerJob)
+  public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback,
+                     List<String> args)
     throws NotAuthorizedException, BusyException,
     IOException, QueueException {
-    UserGroupInformation ugi = null;
     try {
-      ugi = UgiFactory.getUgi(user);
+      UserGroupInformation ugi = UgiFactory.getUgi(user);
 
       final long startTime = System.nanoTime();
 
-      String id = queueAsUser(ugi, args, controllerJob);
+      String id = queueAsUser(ugi, args);
 
       long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6));
       LOG.debug("queued job " + id + " in " + elapsed + " ms");
@@ -199,91 +96,24 @@ public class LauncherDelegator extends TempletonDelegator {
       return new EnqueueBean(id);
     } catch (InterruptedException e) {
       throw new QueueException("Unable to launch job " + e);
-    } finally {
-      if (ugi != null) {
-        FileSystem.closeAllForUGI(ugi);
-      }
     }
   }
 
-  private String queueAsUser(UserGroupInformation ugi, final List<String> args,
-                            final TempletonControllerJob controllerJob)
+  private String queueAsUser(UserGroupInformation ugi, final List<String> args)
     throws IOException, InterruptedException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Launching job: " + args);
     }
     return ugi.doAs(new PrivilegedExceptionAction<String>() {
       public String run() throws Exception {
-        runTempletonControllerJob(controllerJob, args);
-        return controllerJob.getSubmittedId();
+        String[] array = new String[args.size()];
+        TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf);
+        ToolRunner.run(ctrl, args.toArray(array));
+        return ctrl.getSubmittedId();
       }
     });
   }
 
-  /*
-   * Kills templeton job with multiple retries if job exists. Returns true if kill job
-   * attempt is success. Otherwise returns false.
-   */
-  private boolean killTempletonJobWithRetry(String user, String jobId) {
-    /*
-     * Make null safe Check if the job submission has gone through and if job is valid.
-     */
-    if (StringUtils.startsWith(jobId, "job_")) {
-      LOG.info("Started killing the job " + jobId);
-
-      boolean success = false;
-      int count = 0;
-      do {
-        try {
-          count++;
-          killJob(user, jobId);
-          success = true;
-          LOG.info("Kill job attempt succeeded.");
-         } catch (Exception e) {
-          LOG.info("Failed to kill the job due to exception: " + e.getMessage());
-          LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s before retrying "
-                       + "the operation. Iteration: " + count);
-          try {
-            Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000);
-          } catch (InterruptedException ex) {
-            LOG.info("Got interrupted while waiting for next retry.");
-          }
-        }
-      } while (!success && count < jobTimeoutTaskRetryCount);
-
-      return success;
-    } else {
-      LOG.info("Couldn't find a valid job id after job request is timed out.");
-      return false;
-    }
-  }
-
-  /*
-   * Gets new templeton controller objects.
-   */
-  protected TempletonControllerJob getTempletonController() {
-    return new TempletonControllerJob(secureMeatastoreAccess, appConf);
-  }
-
-  /*
-   * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run
-   * the actual job.
-   */
-  protected int runTempletonControllerJob(TempletonControllerJob controllerJob, List<String> args)
-    throws IOException, InterruptedException, TimeoutException, Exception {
-    String[] array = new String[args.size()];
-    return ToolRunner.run(controllerJob, args.toArray(array));
-  }
-
-  /*
-   * Uses DeleteDelegator to kill a job and ignores all exceptions.
-   */
-  protected void killJob(String user, String jobId)
-  throws NotAuthorizedException, BadParam, IOException, InterruptedException {
-    DeleteDelegator d = new DeleteDelegator(appConf);
-    d.run(user, jobId);
-  }
-
   public List<String> makeLauncherArgs(AppConfig appConf, String statusdir,
                      String completedUrl,
                      List<String> copyFiles,
@@ -350,35 +180,24 @@ public class LauncherDelegator extends TempletonDelegator {
    */
   private String getShimLibjars() {
     WebHCatJTShim shim = null;
-    UserGroupInformation ugi = null;
     try {
-      ugi = UserGroupInformation.getCurrentUser();
-      shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
-
-      // Besides the HiveShims jar which is Hadoop version dependent we also
-      // always need to include hive shims common jars.
-      Path shimCommonJar = new Path(
-          TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
-      Path shimCommonSecureJar = new Path(
-          TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
-      Path shimJar = new Path(
-          TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
-
-      return String.format(
-          "%s,%s,%s",
-          shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
+      shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, UserGroupInformation.getCurrentUser());
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get shimLibJars", e);
-    } finally {
-      try {
-        if (ugi != null) {
-          FileSystem.closeAllForUGI(ugi);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to closeAllForUGI", e);
-      }
+      throw new RuntimeException("Failed to get WebHCatShim", e);
     }
 
+    // Besides the HiveShims jar which is Hadoop version dependent we also
+    // always need to include hive shims common jars.
+    Path shimCommonJar = new Path(
+        TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
+    Path shimCommonSecureJar = new Path(
+        TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
+    Path shimJar = new Path(
+        TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
+
+    return String.format(
+        "%s,%s,%s",
+        shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
   }
 
   // Storage vars
@@ -444,7 +263,7 @@ public class LauncherDelegator extends TempletonDelegator {
   }
   /**
    * This is called by subclasses when they determined that the sumbmitted job requires
-   * metastore access (e.g. Pig job that uses HCatalog).  This then determines if
+   * metastore access (e.g. Pig job that uses HCatalog).  This then determines if 
    * secure access is required and causes TempletonControllerJob to set up a delegation token.
    * @see TempletonControllerJob
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
index dfa59f8..a30ecd1 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
@@ -19,15 +19,9 @@
 package org.apache.hive.hcatalog.templeton;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobStatus;
@@ -37,82 +31,20 @@ import org.apache.hadoop.security.UserGroupInformation;
  * List jobs owned by a user.
  */
 public class ListDelegator extends TempletonDelegator {
-  private static final Log LOG = LogFactory.getLog(ListDelegator.class);
-  private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute";
-
-  /**
-   * Current thread id used to set in execution threads.
-   */
-  private final String listThreadId = Thread.currentThread().getName();
-
-  /*
-   * Job request executor to list job status requests.
-   */
-  private static JobRequestExecutor<List<JobItemBean>> jobRequest =
-                   new JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List,
-                   AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT);
-
   public ListDelegator(AppConfig appConf) {
     super(appConf);
   }
 
-  /*
-   * List status jobs request. If maximum concurrent job list requests are configured then
-   * list request will be executed on a thread from thread pool. If job list request time out
-   * is configured then request execution thread will be interrupted if thread times out and
-   * does no action.
-   */
-  public List<JobItemBean> run(final String user, final boolean showall, final String jobId,
-                               final int numRecords, final boolean showDetails)
-    throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException,
-           TimeoutException, ExecutionException, TooManyRequestsException {
-
-    if (jobRequest.isThreadPoolEnabled()) {
-      return jobRequest.execute(getJobListTask(user, showall, jobId,numRecords, showDetails));
-    } else {
-      return listJobs(user, showall, jobId, numRecords, showDetails);
-    }
-  }
-
-  /*
-   * Job callable task for job list operation. Overrides behavior of execute() to list jobs.
-   * No need to override behavior of cleanup() as there is nothing to be done if list jobs
-   * operation is timed out or interrupted.
-   */
-  private JobCallable<List<JobItemBean>> getJobListTask(final String user, final boolean showall,
-                      final String jobId, final int numRecords, final boolean showDetails) {
-    return new JobCallable<List<JobItemBean>>() {
-      @Override
-      public List<JobItemBean> execute() throws NotAuthorizedException, BadParam, IOException,
-                                             InterruptedException {
-       /*
-        * Change the current thread name to include parent thread Id if it is executed
-        * in thread pool. Useful to extract logs specific to a job request and helpful
-        * to debug job issues.
-        */
-        Thread.currentThread().setName(String.format("%s-%s-%s", JOB_LIST_EXECUTE_THREAD_PREFIX,
-                                       listThreadId, Thread.currentThread().getId()));
-
-        return listJobs(user, showall, jobId, numRecords, showDetails);
-      }
-    };
-  }
-
-  /*
-   * Gets list of job ids and calls getJobStatus to get status for each job id.
-   */
-  public List<JobItemBean> listJobs(String user, boolean showall, String jobId,
-                                    int numRecords, boolean showDetails)
+  public List<String> run(String user, boolean showall)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException {
 
-    UserGroupInformation ugi = null;
+    UserGroupInformation ugi = UgiFactory.getUgi(user);
     WebHCatJTShim tracker = null;
-    ArrayList<String> ids = new ArrayList<String>();
-
     try {
-      ugi = UgiFactory.getUgi(user);
       tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
 
+      ArrayList<String> ids = new ArrayList<String>();
+
       JobStatus[] jobs = tracker.getAllJobs();
 
       if (jobs != null) {
@@ -122,81 +54,13 @@ public class ListDelegator extends TempletonDelegator {
             ids.add(id);
         }
       }
+
+      return ids;
     } catch (IllegalStateException e) {
       throw new BadParam(e.getMessage());
     } finally {
       if (tracker != null)
         tracker.close();
-      if (ugi != null)
-        FileSystem.closeAllForUGI(ugi);
     }
-
-    return getJobStatus(ids, user, showall, jobId, numRecords, showDetails);
-  }
-
-  /*
-   * Returns job status for list of input jobs as a list.
-   */
-  public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, boolean showall,
-                                       String jobId, int numRecords, boolean showDetails)
-                                       throws IOException, InterruptedException {
-
-    List<JobItemBean> detailList = new ArrayList<JobItemBean>();
-    int currRecord = 0;
-
-    // Sort the list as requested
-    boolean isAscendingOrder = true;
-    switch (appConf.getListJobsOrder()) {
-    case lexicographicaldesc:
-      Collections.sort(jobIds, Collections.reverseOrder());
-      isAscendingOrder = false;
-      break;
-    case lexicographicalasc:
-    default:
-      Collections.sort(jobIds);
-      break;
-    }
-
-    for (String job : jobIds) {
-      // If numRecords = -1, fetch all records.
-      // Hence skip all the below checks when numRecords = -1.
-      if (numRecords != -1) {
-        // If currRecord >= numRecords, we have already fetched the top #numRecords
-        if (currRecord >= numRecords) {
-          break;
-        }
-        else if (jobId == null || jobId.trim().length() == 0) {
-            currRecord++;
-        }
-        // If the current record needs to be returned based on the
-        // filter conditions specified by the user, increment the counter
-        else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) {
-          currRecord++;
-        }
-        // The current record should not be included in the output detailList.
-        else {
-          continue;
-        }
-      }
-      JobItemBean jobItem = new JobItemBean();
-      jobItem.id = job;
-      if (showDetails) {
-        StatusDelegator sd = new StatusDelegator(appConf);
-        try {
-          jobItem.detail = sd.run(user, job, false);
-        }
-        catch(Exception ex) {
-          /*
-           * if we could not get status for some reason, log it, and send empty status back with
-           * just the ID so that caller knows to even look in the log file
-           */
-          LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
-          jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
-        }
-      }
-      detailList.add(jobItem);
-    }
-
-    return detailList;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
index 3ed3ece..5208bf4 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.HashMap;
 
 import org.slf4j.Logger;
@@ -44,15 +43,14 @@ import org.eclipse.jetty.rewrite.handler.RedirectPatternRule;
 import org.eclipse.jetty.rewrite.handler.RewriteHandler;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.FilterMapping;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.xml.XmlConfiguration;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
-import javax.servlet.DispatcherType;
 import javax.servlet.http.HttpServletRequest;
 
 /**
@@ -124,7 +122,7 @@ public class Main {
       checkEnv();
       runServer(port);
       // Currently only print the first port to be consistent with old behavior
-      port =  ArrayUtils.isEmpty(server.getConnectors()) ? -1 : ((ServerConnector)(server.getConnectors()[0])).getLocalPort();
+      port =  ArrayUtils.isEmpty(server.getConnectors()) ? -1 : server.getConnectors()[0].getPort();
 
       System.out.println("templeton: listening on port " + port);
       LOG.info("Templeton listening on port " + port);
@@ -187,7 +185,6 @@ public class Main {
 
     // Add the Auth filter
     FilterHolder fHolder = makeAuthFilter();
-    EnumSet<DispatcherType> dispatches = EnumSet.of(DispatcherType.REQUEST);
 
     /* 
      * We add filters for each of the URIs supported by templeton.
@@ -196,18 +193,28 @@ public class Main {
      * This is because mapreduce does not use secure credentials for 
      * callbacks. So jetty would fail the request as unauthorized.
      */ 
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", dispatches);
-    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", dispatches);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", 
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", 
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", 
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*",
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", 
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*",
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", 
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", 
+             FilterMapping.REQUEST);
+    root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", 
+             FilterMapping.REQUEST);
 
     if (conf.getBoolean(AppConfig.XSRF_FILTER_ENABLED, false)){
-      root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*", dispatches);
+      root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*",
+             FilterMapping.REQUEST);
       LOG.debug("XSRF filter enabled");
     } else {
       LOG.warn("XSRF filter disabled");