You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:29 UTC
[42/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index f7e3e3a..6f96e1d 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.RawStoreProxy;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -86,23 +88,17 @@ public class DbNotificationListener extends MetaStoreEventListener {
// HiveConf rather than a Configuration.
private HiveConf hiveConf;
private MessageFactory msgFactory;
- private RawStore rs;
-
- private synchronized void init(HiveConf conf) {
- try {
- rs = RawStoreProxy.getProxy(conf, conf,
- conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999);
- } catch (MetaException e) {
- LOG.error("Unable to connect to raw store, notifications will not be tracked", e);
- rs = null;
- }
- if (cleaner == null && rs != null) {
- cleaner = new CleanerThread(conf, rs);
+
+ private synchronized void init(HiveConf conf) throws MetaException {
+ if (cleaner == null) {
+ cleaner =
+ new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf,
+ conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999));
cleaner.start();
}
}
- public DbNotificationListener(Configuration config) {
+ public DbNotificationListener(Configuration config) throws MetaException {
super(config);
// The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor
// with a Configuration parameter, so we have to declare config as Configuration. But it
@@ -142,7 +138,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, tableEvent);
}
/**
@@ -157,7 +153,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, tableEvent);
}
/**
@@ -170,10 +166,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
Table after = tableEvent.getNewTable();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
- .buildAlterTableMessage(before, after).toString());
+ .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString());
event.setDbName(after.getDbName());
event.setTableName(after.getTableName());
- process(event);
+ process(event, tableEvent);
}
class FileIterator implements Iterator<String> {
@@ -281,7 +277,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, partitionEvent);
}
/**
@@ -296,7 +292,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, partitionEvent);
}
/**
@@ -309,10 +305,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
Partition after = partitionEvent.getNewPartition();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
- .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
+ .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString());
event.setDbName(before.getDbName());
event.setTableName(before.getTableName());
- process(event);
+ process(event, partitionEvent);
}
/**
@@ -326,7 +322,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
.buildCreateDatabaseMessage(db).toString());
event.setDbName(db.getName());
- process(event);
+ process(event, dbEvent);
}
/**
@@ -340,7 +336,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
.buildDropDatabaseMessage(db).toString());
event.setDbName(db.getName());
- process(event);
+ process(event, dbEvent);
}
/**
@@ -354,7 +350,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
.buildCreateFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- process(event);
+ process(event, fnEvent);
}
/**
@@ -368,7 +364,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
.buildDropFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- process(event);
+ process(event, fnEvent);
}
/**
@@ -382,7 +378,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
.buildCreateIndexMessage(index).toString());
event.setDbName(index.getDbName());
- process(event);
+ process(event, indexEvent);
}
/**
@@ -396,7 +392,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
.buildDropIndexMessage(index).toString());
event.setDbName(index.getDbName());
- process(event);
+ process(event, indexEvent);
}
/**
@@ -411,7 +407,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
.buildAlterIndexMessage(before, after).toString());
event.setDbName(before.getDbName());
- process(event);
+ process(event, indexEvent);
}
class FileChksumIterator implements Iterator<String> {
@@ -443,12 +439,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
public void onInsert(InsertEvent insertEvent) throws MetaException {
NotificationEvent event =
new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
- insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
+ insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
.toString());
event.setDbName(insertEvent.getDb());
event.setTableName(insertEvent.getTable());
- process(event);
+ process(event, insertEvent);
}
/**
@@ -472,18 +468,27 @@ public class DbNotificationListener extends MetaStoreEventListener {
return (int)millis;
}
- // Process this notification by adding it to metastore DB
- private void process(NotificationEvent event) {
+ /**
+ * Process this notification by adding it to metastore DB.
+ *
+ * @param event NotificationEvent is the object written to the metastore DB.
+ * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
+ * DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
+ */
+ private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
event.setMessageFormat(msgFactory.getMessageFormat());
- if (rs != null) {
- synchronized (NOTIFICATION_TBL_LOCK) {
- LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
- event.getMessage());
- rs.addNotificationEvent(event);
- }
- } else {
- LOG.warn("Dropping event " + event + " since notification is not running.");
+ synchronized (NOTIFICATION_TBL_LOCK) {
+ LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
+ event.getMessage());
+ HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
}
+
+ // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
+ if (event.isSetEventId()) {
+ listenerEvent.putParameter(
+ MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+ Long.toString(event.getEventId()));
+ }
}
private static class CleanerThread extends Thread {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
new file mode 100644
index 0000000..a4f2d59
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+/**
+ * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent
+ * parameters.
+ */
+public class MetaStoreEventListenerConstants {
+ /*
+ * DbNotificationListener keys reserved for updating ListenerEvent parameters.
+ *
+ * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener
+ * processed during an event. This event identifier might be shared
+ * across other MetaStoreEventListener implementations.
+ */
+ public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index e765305..5bea0a6 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
new file mode 100644
index 0000000..78987ab
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+ private RegexSerDe serde;
+ private final StructObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+
+ /**
+ * @param endPoint the end point to write to
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(null, endPoint, null, conn);
+ }
+
+ /**
+ * @param endPoint the end point to write to
+ * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ this(null, endPoint, conf, conn);
+ }
+
+ /**
+ * @param regex to parse the data
+ * @param endPoint the end point to write to
+ * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+ * @param conn connection this Writer is to be used with
+ * @throws ConnectionError
+ * @throws SerializationError
+ * @throws StreamingException
+ */
+ public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+ throws ConnectionError, SerializationError, StreamingException {
+ super(endPoint, conf, conn);
+ this.serde = createSerde(tbl, conf, regex);
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ }
+
+ @Override
+ public AbstractSerDe getSerde() {
+ return serde;
+ }
+
+ @Override
+ protected StructObjectInspector getRecordObjectInspector() {
+ return recordObjInspector;
+ }
+
+ @Override
+ protected StructField[] getBucketStructFields() {
+ return bucketStructFields;
+ }
+
+ @Override
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+
+ @Override
+ public void write(long transactionId, byte[] record)
+ throws StreamingIOFailure, SerializationError {
+ try {
+ Object encodedRow = encode(record);
+ int bucket = getBucket(encodedRow);
+ getRecordUpdater(bucket).insert(transactionId, encodedRow);
+ } catch (IOException e) {
+ throw new StreamingIOFailure("Error writing record in transaction("
+ + transactionId + ")", e);
+ }
+ }
+
+ /**
+ * Creates RegexSerDe
+ * @param tbl used to create serde
+ * @param conf used to create serde
+ * @param regex used to create serde
+ * @return
+ * @throws SerializationError if serde could not be initialized
+ */
+ private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+ throws SerializationError {
+ try {
+ Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+ tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+ ArrayList<String> tableColumns = getCols(tbl);
+ tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+ RegexSerDe serde = new RegexSerDe();
+ SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+ return serde;
+ } catch (SerDeException e) {
+ throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
+ }
+ }
+
+ private static ArrayList<String> getCols(Table table) {
+ List<FieldSchema> cols = table.getSd().getCols();
+ ArrayList<String> colNames = new ArrayList<String>(cols.size());
+ for (FieldSchema col : cols) {
+ colNames.add(col.getName().toLowerCase());
+ }
+ return colNames;
+ }
+
+ /**
+ * Encode Utf8 encoded string bytes using RegexSerDe
+ *
+ * @param utf8StrRecord
+ * @return The encoded object
+ * @throws SerializationError
+ */
+ @Override
+ public Object encode(byte[] utf8StrRecord) throws SerializationError {
+ try {
+ Text blob = new Text(utf8StrRecord);
+ return serde.deserialize(blob);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to convert byte[] record into Object", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index bf29993..097de9b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -64,10 +64,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.tools.FileDump;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -82,11 +78,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
@@ -485,9 +485,9 @@ public class TestStreaming {
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
- for (int i = 0; i < records.length; i++) {
+ for (String record : records) {
Assert.assertEquals(true, rr.next(key, value));
- Assert.assertEquals(records[i], value.toString());
+ Assert.assertEquals(record, value.toString());
}
Assert.assertEquals(false, rr.next(key, value));
}
@@ -741,7 +741,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -787,6 +787,75 @@ public class TestStreaming {
}
@Test
+ public void testTransactionBatchCommit_Regex() throws Exception {
+ testTransactionBatchCommit_Regex(null);
+ }
+ @Test
+ public void testTransactionBatchCommit_RegexUGI() throws Exception {
+ testTransactionBatchCommit_Regex(Utils.getUGI());
+ }
+ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+ partitionVals);
+ StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ String regex = "([^,]*),(.*)";
+ StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+ // 1st Txn
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1,Hello streaming".getBytes());
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+
+ // 2nd Txn
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("2,Welcome to streaming".getBytes());
+
+ // data should not be visible
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+
+ txnBatch.commit();
+
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
+
+ txnBatch.close();
+ Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+ , txnBatch.getCurrentTransactionState());
+
+
+ connection.close();
+
+
+ // To Unpartitioned table
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+ regex = "([^:]*):(.*)";
+ writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+ // 1st Txn
+ txnBatch = connection.fetchTransactionBatch(10, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN
+ , txnBatch.getCurrentTransactionState());
+ txnBatch.write("1:Hello streaming".getBytes());
+ txnBatch.commit();
+
+ Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+ , txnBatch.getCurrentTransactionState());
+ connection.close();
+ }
+
+ @Test
public void testTransactionBatchCommit_Json() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
@@ -802,7 +871,7 @@ public class TestStreaming {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -929,7 +998,7 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -948,13 +1017,13 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -965,14 +1034,14 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1009,11 +1078,11 @@ public class TestStreaming {
txnBatch2.commit();
- checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1021,17 +1090,17 @@ public class TestStreaming {
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}");
txnBatch2.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1700,7 +1769,7 @@ public class TestStreaming {
txnBatch.heartbeat();//this is no-op on closed batch
txnBatch.abort();//ditto
GetOpenTxnsInfoResponse r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1764,7 +1833,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1787,7 +1856,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/java-client/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml
index 3b53664..3bb9f4d 100644
--- a/hcatalog/webhcat/java-client/pom.xml
+++ b/hcatalog/webhcat/java-client/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
index b9cb067..86d3acb 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
@@ -434,7 +434,7 @@ public class TestHCatClient {
HCatClient client = HCatClient.create(new Configuration(hcatConf));
boolean isExceptionCaught = false;
// Table creation with a long table name causes ConnectionFailureException
- final String tableName = "Temptable" + new BigInteger(200, new Random()).toString(2);
+ final String tableName = "Temptable" + new BigInteger(260, new Random()).toString(2);
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", Type.INT, "id columns"));
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml
index c5ad387..a55ffe9 100644
--- a/hcatalog/webhcat/svr/pom.xml
+++ b/hcatalog/webhcat/svr/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -45,9 +45,50 @@
<artifactId>hive-hcatalog-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-runner</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-sslengine</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- inter-project -->
<dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-rewrite</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>${jersey.version}</version>
@@ -93,11 +134,6 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty.aggregate</groupId>
- <artifactId>jetty-all-server</artifactId>
- <version>${jetty.version}</version>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
@@ -107,7 +143,7 @@
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<exclusions>
- <exclusion>
+ <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
@@ -121,16 +157,42 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- test inter-project -->
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 54d0907..0ea7d88 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -111,6 +111,43 @@ public class AppConfig extends Configuration {
public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb";
public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder";
+ /*
+ * These parameters controls the maximum number of concurrent job submit/status/list
+ * operations in templeton service. If more number of concurrent requests comes then
+ * they will be rejected with BusyException.
+ */
+ public static final String JOB_SUBMIT_MAX_THREADS = "templeton.parallellism.job.submit";
+ public static final String JOB_STATUS_MAX_THREADS = "templeton.parallellism.job.status";
+ public static final String JOB_LIST_MAX_THREADS = "templeton.parallellism.job.list";
+
+ /*
+ * These parameters controls the maximum time job submit/status/list operation is
+ * executed in templeton service. On time out, the execution is interrupted and
+ * TimeoutException is returned to client. On time out
+ * For list and status operation, there is no action needed as they are read requests.
+ * For submit operation, we do best effort to kill the job if its generated. Enabling
+ * this parameter may have following side effects
+ * 1) There is a possibility for having active job for some time when the client gets
+ * response for submit operation and a list operation from client could potential
+ * show the newly created job which may eventually be killed with no guarantees.
+ * 2) If submit operation retried by client then there is a possibility of duplicate
+ * jobs triggered.
+ *
+ * Time out configs should be configured in seconds.
+ *
+ */
+ public static final String JOB_SUBMIT_TIMEOUT = "templeton.job.submit.timeout";
+ public static final String JOB_STATUS_TIMEOUT = "templeton.job.status.timeout";
+ public static final String JOB_LIST_TIMEOUT = "templeton.job.list.timeout";
+
+ /*
+ * If task execution time out is configured for submit operation then job may need to
+ * be killed on execution time out. These parameters controls the maximum number of
+ * retries and retry wait time in seconds for executing the time out task.
+ */
+ public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = "templeton.job.timeout.task.retry.count";
+ public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = "templeton.job.timeout.task.retry.interval";
+
/**
* see webhcat-default.xml
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
index 4b2dfec..622f92d 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.security.UserGroupInformation;
@@ -41,10 +42,11 @@ public class DeleteDelegator extends TempletonDelegator {
public QueueStatusBean run(String user, String id)
throws NotAuthorizedException, BadParam, IOException, InterruptedException
{
- UserGroupInformation ugi = UgiFactory.getUgi(user);
+ UserGroupInformation ugi = null;
WebHCatJTShim tracker = null;
JobState state = null;
try {
+ ugi = UgiFactory.getUgi(user);
tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
JobID jobid = StatusDelegator.StringToJobID(id);
if (jobid == null)
@@ -69,6 +71,8 @@ public class DeleteDelegator extends TempletonDelegator {
tracker.close();
if (state != null)
state.close();
+ if (ugi != null)
+ FileSystem.closeAllForUGI(ugi);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index f0296cb..1953028 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator {
String statusdir, String callback, String completedUrl, boolean enablelog,
Boolean enableJobReconnect)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException
{
runAs = user;
List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir,
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index 84cd5b9..1246b40 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator {
boolean usesHcatalog, String completedUrl,
boolean enablelog, Boolean enableJobReconnect, JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
runAs = user;
List<String> args = makeArgs(jar, mainClass,
libjars, files, jarArgs, defines,
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
new file mode 100644
index 0000000..e703eff
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class JobCallable<T> implements Callable<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class);
+
+ static public enum JobState {
+ STARTED,
+ FAILED,
+ COMPLETED
+ }
+
+ /*
+ * Job state of job request. Changes to the state are synchronized using
+ * setStateAndResult. This is required due to two different threads,
+ * main thread and job execute thread, tries to change state and organize
+ * clean up tasks.
+ */
+ private JobState jobState = JobState.STARTED;
+
+ /*
+ * Result of JobCallable task after successful task completion. This is
+ * expected to be set by the thread which executes JobCallable task.
+ */
+ public T returnResult = null;
+
+ /*
+ * Sets the job state to FAILED. Returns true if FAILED status is set.
+ * Otherwise, it returns false.
+ */
+ public boolean setJobStateFailed() {
+ return setStateAndResult(JobState.FAILED, null);
+ }
+
+ /*
+ * Sets the job state to COMPLETED and also sets the results value. Returns true
+ * if COMPLETED status is set. Otherwise, it returns false.
+ */
+ public boolean setJobStateCompleted(T result) {
+ return setStateAndResult(JobState.COMPLETED, result);
+ }
+
+ /*
+ * Sets the job state and result. Returns true if status and result are set.
+ * Otherwise, it returns false.
+ */
+ private synchronized boolean setStateAndResult(JobState jobState, T result) {
+ if (this.jobState == JobState.STARTED) {
+ this.jobState = jobState;
+ this.returnResult = result;
+ return true;
+ } else {
+ LOG.info("Failed to set job state to " + jobState + " due to job state "
+ + this.jobState + ". Expected state is " + JobState.STARTED);
+ }
+
+ return false;
+ }
+
+ /*
+ * Executes the callable task with help of execute() call and gets the result
+ * of the task. It also sets job status as COMPLETED if state is not already
+ * set to FAILED and returns result to future.
+ */
+ public T call() throws Exception {
+
+ /*
+ * Don't catch any execution exceptions here and let the caller catch it.
+ */
+ T result = this.execute();
+
+ if (!this.setJobStateCompleted(result)) {
+ /*
+ * Failed to set job status as COMPLETED which mean the main thread would have
+ * exited and not waiting for the result. Call cleanup() to execute any cleanup.
+ */
+ cleanup();
+ return null;
+ }
+
+ return this.returnResult;
+ }
+
+ /*
+ * Abstract method to be overridden for task execution.
+ */
+ public abstract T execute() throws Exception;
+
+ /*
+ * Cleanup method called to run cleanup tasks if job state is FAILED. By default,
+ * no cleanup is provided.
+ */
+ public void cleanup() {}
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
new file mode 100644
index 0000000..9ac4588
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobRequestExecutor<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class);
+ private static AppConfig appConf = Main.getAppConfigInstance();
+
+ /*
+ * Thread pool to execute job requests.
+ */
+ private ThreadPoolExecutor jobExecutePool = null;
+
+ /*
+ * Type of job request.
+ */
+ private JobRequestType requestType;
+
+ /*
+ * Config name used to find the number of concurrent requests.
+ */
+ private String concurrentRequestsConfigName;
+
+ /*
+ * Config name used to find the maximum time job request can be executed.
+ */
+ private String jobTimeoutConfigName;
+
+ /*
+ * Job request execution time out in seconds. If it is 0 then request
+ * will not be timed out.
+ */
+ private int requestExecutionTimeoutInSec = 0;
+
+ /*
+ * Amount of time a thread can be alive in thread pool before cleaning this up. Core threads
+ * will not be cleanup from thread pool.
+ */
+ private int threadKeepAliveTimeInHours = 1;
+
+ /*
+ * Maximum number of times a cancel request is sent to job request execution
+ * thread. Future.cancel may not be able to interrupt the thread if it is
+ * blocked on network calls.
+ */
+ private int maxTaskCancelRetryCount = 10;
+
+ /*
+ * Wait time in milliseconds before another cancel request is made.
+ */
+ private int maxTaskCancelRetryWaitTimeInMs = 1000;
+
+ /*
+ * A flag to indicate whether to cancel the task when exception TimeoutException or
+ * InterruptedException or CancellationException raised. The default is cancel thread.
+ */
+ private boolean enableCancelTask = true;
+
+ /*
+ * Job Request type.
+ */
+ public enum JobRequestType {
+ Submit,
+ Status,
+ List
+ }
+
+ /*
+ * Creates a job request object and sets up execution environment. Creates a thread pool
+ * to execute job requests.
+ *
+ * @param requestType
+ * Job request type
+ *
+ * @param concurrentRequestsConfigName
+ * Config name to be used to extract number of concurrent requests to be serviced.
+ *
+ * @param jobTimeoutConfigName
+ * Config name to be used to extract maximum time a task can execute a request.
+ *
+ * @param enableCancelTask
+ * A flag to indicate whether to cancel the task when exception TimeoutException
+ * or InterruptedException or CancellationException raised.
+ *
+ */
+ public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
+ String jobTimeoutConfigName, boolean enableCancelTask) {
+
+ this.concurrentRequestsConfigName = concurrentRequestsConfigName;
+ this.jobTimeoutConfigName = jobTimeoutConfigName;
+ this.requestType = requestType;
+ this.enableCancelTask = enableCancelTask;
+
+ /*
+ * The default number of threads will be 0. That means thread pool is not used and
+ * operation is executed with the current thread.
+ */
+ int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ?
+ appConf.getInt(concurrentRequestsConfigName, 0) : 0;
+
+ if (threads > 0) {
+ /*
+ * Create a thread pool with no queue wait time to execute the operation. This will ensure
+ * that job requests are rejected if there are already maximum number of threads busy.
+ */
+ this.jobExecutePool = new ThreadPoolExecutor(threads, threads,
+ threadKeepAliveTimeInHours, TimeUnit.HOURS,
+ new SynchronousQueue<Runnable>());
+ this.jobExecutePool.allowCoreThreadTimeOut(true);
+
+ /*
+ * Get the job request time out value. If this configuration value is set to 0
+ * then job request will wait until it finishes.
+ */
+ if (!StringUtils.isEmpty(jobTimeoutConfigName)) {
+ this.requestExecutionTimeoutInSec = appConf.getInt(jobTimeoutConfigName, 0);
+ }
+
+ LOG.info("Configured " + threads + " threads for job request type " + this.requestType
+ + " with time out " + this.requestExecutionTimeoutInSec + " s.");
+ } else {
+ /*
+ * If threads are not configured then they will be executed in current thread itself.
+ */
+ LOG.info("No thread pool configured for job request type " + this.requestType);
+ }
+ }
+
+ /*
+ * Creates a job request object and sets up execution environment. Creates a thread pool
+ * to execute job requests.
+ *
+ * @param requestType
+ * Job request type
+ *
+ * @param concurrentRequestsConfigName
+ * Config name to be used to extract number of concurrent requests to be serviced.
+ *
+ * @param jobTimeoutConfigName
+ * Config name to be used to extract maximum time a task can execute a request.
+ *
+ */
+ public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
+ String jobTimeoutConfigName) {
+ this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, true);
+ }
+
+ /*
+ * Returns true of thread pool is created and can be used for executing a job request.
+ * Otherwise, returns false.
+ */
+ public boolean isThreadPoolEnabled() {
+ return this.jobExecutePool != null;
+ }
+
+ /*
+ * Executes job request operation. If thread pool is not created then job request is
+ * executed in current thread itself.
+ *
+ * @param jobExecuteCallable
+ * Callable object to run the job request task.
+ *
+ */
+ public T execute(JobCallable<T> jobExecuteCallable) throws InterruptedException,
+ TimeoutException, TooManyRequestsException, ExecutionException {
+ /*
+ * The callable shouldn't be null to execute. The thread pool also should be configured
+ * to execute requests.
+ */
+ assert (jobExecuteCallable != null);
+ assert (this.jobExecutePool != null);
+
+ String type = this.requestType.toString().toLowerCase();
+
+ String retryMessageForConcurrentRequests = "Please wait for some time before retrying "
+ + "the operation. Please refer to the config " + concurrentRequestsConfigName
+ + " to configure concurrent requests.";
+
+ LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeoutInSec
+ + "seconds.");
+ Future<T> future = null;
+
+ try {
+ future = this.jobExecutePool.submit(jobExecuteCallable);
+ } catch (RejectedExecutionException rejectedException) {
+ /*
+ * Not able to find thread to execute the job request. Raise Busy exception and client
+ * can retry the operation.
+ */
+ String tooManyRequestsExceptionMessage = "Unable to service the " + type + " job request as "
+ + "templeton service is busy with too many " + type + " job requests. "
+ + retryMessageForConcurrentRequests;
+
+ LOG.warn(tooManyRequestsExceptionMessage);
+ throw new TooManyRequestsException(tooManyRequestsExceptionMessage);
+ }
+
+ T result = null;
+
+ try {
+ result = this.requestExecutionTimeoutInSec > 0
+ ? future.get(this.requestExecutionTimeoutInSec, TimeUnit.SECONDS) : future.get();
+ } catch (TimeoutException e) {
+ /*
+ * See if the execution thread has just completed operation and result is available.
+ * If result is available then return the result. Otherwise, raise exception.
+ */
+ if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
+ String message = this.requestType + " job request got timed out. Please wait for some time "
+ + "before retrying the operation. Please refer to the config "
+ + jobTimeoutConfigName + " to configure job request time out.";
+ LOG.warn(message);
+
+ /*
+ * Throw TimeoutException to caller.
+ */
+ throw new TimeoutException(message);
+ }
+ } catch (InterruptedException e) {
+ /*
+ * See if the execution thread has just completed operation and result is available.
+ * If result is available then return the result. Otherwise, raise exception.
+ */
+ if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
+ String message = this.requestType + " job request got interrupted. Please wait for some time "
+ + "before retrying the operation.";
+ LOG.warn(message);
+
+ /*
+ * Throw TimeoutException to caller.
+ */
+ throw new InterruptedException(message);
+ }
+ } catch (CancellationException e) {
+ /*
+ * See if the execution thread has just completed operation and result is available.
+ * If result is available then return the result. Otherwise, raise exception.
+ */
+ if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
+ String message = this.requestType + " job request got cancelled and thread got interrupted. "
+ + "Please wait for some time before retrying the operation.";
+ LOG.warn(message);
+
+ throw new InterruptedException(message);
+ }
+ } finally {
+ /*
+ * If the thread is still active and needs to be cancelled then cancel it. This may
+ * happen in case task got interrupted, or timed out.
+ */
+ if (enableCancelTask) {
+ cancelExecutePoolThread(future);
+ }
+ }
+
+ LOG.debug("Completed " + type + " job request.");
+
+ return result;
+ }
+
+ /*
+ * Initiate cancel request to cancel the thread execution and interrupt the thread.
+ * If thread interruption is not handled by jobExecuteCallable then thread may continue
+ * running to completion. The cancel call may fail for some scenarios. In that case,
+ * retry the cancel call until it returns true or max retry count is reached.
+ *
+ * @param future
+ * Future object which has handle to cancel the thread.
+ *
+ */
+ private void cancelExecutePoolThread(Future<T> future) {
+ int retryCount = 0;
+ while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) {
+ LOG.info("Task is still executing the job request. Cancelling it with retry count: "
+ + retryCount);
+ if (future.cancel(true)) {
+ /*
+ * Cancelled the job request and return to client.
+ */
+ LOG.info("Cancel job request issued successfully.");
+ return;
+ }
+
+ retryCount++;
+ try {
+ Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs);
+ } catch (InterruptedException e) {
+ /*
+ * Nothing to do. Just retry.
+ */
+ }
+ }
+
+ LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled()
+ + " Retry count: " + retryCount);
+ }
+
+ /*
+ * Tries to get the job result if job request is completed. Otherwise it sets job status
+ * to FAILED such that execute thread can do necessary clean up based on FAILED state.
+ */
+ private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> jobExecuteCallable) {
+ if (!jobExecuteCallable.setJobStateFailed()) {
+ LOG.info("Job is already COMPLETED. Returning the result.");
+ return jobExecuteCallable.returnResult;
+ } else {
+ LOG.info("Job status set to FAILED. Job clean up to be done by execute thread "
+ + "after job request is executed.");
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index b3f44a2..9bea897 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -23,16 +23,19 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.templeton.tool.JobState;
import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
@@ -50,9 +53,26 @@ public class LauncherDelegator extends TempletonDelegator {
static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP}
private boolean secureMeatastoreAccess = false;
private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*";
+ private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute";
+ private final int jobTimeoutTaskRetryCount;
+ private final int jobTimeoutTaskRetryIntervalInSec;
+
+ /**
+ * Current thread used to set in execution threads.
+ */
+ private final String submitThreadId = Thread.currentThread().getName();
+
+ /**
+ * Job request executor to submit job requests.
+ */
+ private static JobRequestExecutor<EnqueueBean> jobRequest =
+ new JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit,
+ AppConfig.JOB_SUBMIT_MAX_THREADS, AppConfig.JOB_SUBMIT_TIMEOUT, false);
public LauncherDelegator(AppConfig appConf) {
super(appConf);
+ jobTimeoutTaskRetryCount = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0);
+ jobTimeoutTaskRetryIntervalInSec = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0);
}
public void registerJob(String id, String user, String callback,
@@ -70,19 +90,102 @@ public class LauncherDelegator extends TempletonDelegator {
}
}
+ /*
+ * Submit job request. If maximum concurrent job submit requests are configured then submit
+ * request will be executed on a thread from thread pool. If job submit request time out is
+ * configured then request execution thread will be interrupted if thread times out. Also
+ * does best efforts to identify if job is submitted and kill it quietly.
+ */
+ public EnqueueBean enqueueController(final String user, final Map<String, Object> userArgs,
+ final String callback, final List<String> args)
+ throws NotAuthorizedException, BusyException, IOException, QueueException, TooManyRequestsException {
+
+ EnqueueBean bean = null;
+ final TempletonControllerJob controllerJob = getTempletonController();
+
+ if (jobRequest.isThreadPoolEnabled()) {
+ JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, userArgs, callback,
+ args, controllerJob);
+ try {
+ bean = jobRequest.execute(jobExecuteCallable);
+ } catch (TimeoutException ex) {
+ /*
+ * Job request got timed out. Job kill should have started. Return to client with
+ * QueueException.
+ */
+ throw new QueueException(ex.getMessage());
+ } catch (InterruptedException ex) {
+ /*
+ * Job request got interrupted. Job kill should have started. Return to client with
+ * with QueueException.
+ */
+ throw new QueueException(ex.getMessage());
+ } catch (ExecutionException ex) {
+ /*
+ * ExecutionException is raised if job execution gets an exception. Return to client
+ * with the exception.
+ */
+ throw new QueueException(ex.getMessage());
+ }
+ } else {
+ LOG.info("No thread pool configured for submit job request. Executing "
+ + "the job request in current thread.");
+
+ bean = enqueueJob(user, userArgs, callback, args, controllerJob);
+ }
+
+ return bean;
+ }
+
+ /*
+ * Job callable task for job submit operation. Overrides behavior of execute()
+ * to submit job. Also, overrides the behavior of cleanup() to kill the job in case
+ * job submission request is timed out or interrupted.
+ */
+ private JobCallable<EnqueueBean> getJobSubmitTask(final String user,
+ final Map<String, Object> userArgs, final String callback,
+ final List<String> args, final TempletonControllerJob controllerJob) {
+ return new JobCallable<EnqueueBean>() {
+ @Override
+ public EnqueueBean execute() throws NotAuthorizedException, BusyException, IOException,
+ QueueException {
+ /*
+ * Change the current thread name to include parent thread Id if it is executed
+ * in thread pool. Useful to extract logs specific to a job request and helpful
+ * to debug job issues.
+ */
+ Thread.currentThread().setName(String.format("%s-%s-%s", JOB_SUBMIT_EXECUTE_THREAD_PREFIX,
+ submitThreadId, Thread.currentThread().getId()));
+
+ return enqueueJob(user, userArgs, callback, args, controllerJob);
+ }
+
+ @Override
+ public void cleanup() {
+ /*
+ * Failed to set job status as COMPLETED which mean the main thread would have
+ * exited and not waiting for the result. Kill the submitted job.
+ */
+ LOG.info("Job kill not done by main thread. Trying to kill now.");
+ killTempletonJobWithRetry(user, controllerJob.getSubmittedId());
+ }
+ };
+ }
+
/**
* Enqueue the TempletonControllerJob directly calling doAs.
*/
- public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback,
- List<String> args)
+ public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, String callback,
+ List<String> args, TempletonControllerJob controllerJob)
throws NotAuthorizedException, BusyException,
IOException, QueueException {
+ UserGroupInformation ugi = null;
try {
- UserGroupInformation ugi = UgiFactory.getUgi(user);
+ ugi = UgiFactory.getUgi(user);
final long startTime = System.nanoTime();
- String id = queueAsUser(ugi, args);
+ String id = queueAsUser(ugi, args, controllerJob);
long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6));
LOG.debug("queued job " + id + " in " + elapsed + " ms");
@@ -96,24 +199,91 @@ public class LauncherDelegator extends TempletonDelegator {
return new EnqueueBean(id);
} catch (InterruptedException e) {
throw new QueueException("Unable to launch job " + e);
+ } finally {
+ if (ugi != null) {
+ FileSystem.closeAllForUGI(ugi);
+ }
}
}
- private String queueAsUser(UserGroupInformation ugi, final List<String> args)
+ private String queueAsUser(UserGroupInformation ugi, final List<String> args,
+ final TempletonControllerJob controllerJob)
throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Launching job: " + args);
}
return ugi.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws Exception {
- String[] array = new String[args.size()];
- TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf);
- ToolRunner.run(ctrl, args.toArray(array));
- return ctrl.getSubmittedId();
+ runTempletonControllerJob(controllerJob, args);
+ return controllerJob.getSubmittedId();
}
});
}
+ /*
+ * Kills templeton job with multiple retries if job exists. Returns true if kill job
+ * attempt is success. Otherwise returns false.
+ */
+ private boolean killTempletonJobWithRetry(String user, String jobId) {
+ /*
+ * Make null safe Check if the job submission has gone through and if job is valid.
+ */
+ if (StringUtils.startsWith(jobId, "job_")) {
+ LOG.info("Started killing the job " + jobId);
+
+ boolean success = false;
+ int count = 0;
+ do {
+ try {
+ count++;
+ killJob(user, jobId);
+ success = true;
+ LOG.info("Kill job attempt succeeded.");
+ } catch (Exception e) {
+ LOG.info("Failed to kill the job due to exception: " + e.getMessage());
+ LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s before retrying "
+ + "the operation. Iteration: " + count);
+ try {
+ Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000);
+ } catch (InterruptedException ex) {
+ LOG.info("Got interrupted while waiting for next retry.");
+ }
+ }
+ } while (!success && count < jobTimeoutTaskRetryCount);
+
+ return success;
+ } else {
+ LOG.info("Couldn't find a valid job id after job request is timed out.");
+ return false;
+ }
+ }
+
+ /*
+ * Gets new templeton controller objects.
+ */
+ protected TempletonControllerJob getTempletonController() {
+ return new TempletonControllerJob(secureMeatastoreAccess, appConf);
+ }
+
+ /*
+ * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run
+ * the actual job.
+ */
+ protected int runTempletonControllerJob(TempletonControllerJob controllerJob, List<String> args)
+ throws IOException, InterruptedException, TimeoutException, Exception {
+ String[] array = new String[args.size()];
+ return ToolRunner.run(controllerJob, args.toArray(array));
+ }
+
+ /*
+ * Uses DeleteDelegator to kill a job and ignores all exceptions.
+ */
+ protected void killJob(String user, String jobId)
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+ DeleteDelegator d = new DeleteDelegator(appConf);
+ d.run(user, jobId);
+ }
+
public List<String> makeLauncherArgs(AppConfig appConf, String statusdir,
String completedUrl,
List<String> copyFiles,
@@ -180,24 +350,35 @@ public class LauncherDelegator extends TempletonDelegator {
*/
private String getShimLibjars() {
WebHCatJTShim shim = null;
+ UserGroupInformation ugi = null;
try {
- shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, UserGroupInformation.getCurrentUser());
+ ugi = UserGroupInformation.getCurrentUser();
+ shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
+
+ // Besides the HiveShims jar which is Hadoop version dependent we also
+ // always need to include hive shims common jars.
+ Path shimCommonJar = new Path(
+ TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
+ Path shimCommonSecureJar = new Path(
+ TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
+ Path shimJar = new Path(
+ TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
+
+ return String.format(
+ "%s,%s,%s",
+ shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
} catch (IOException e) {
- throw new RuntimeException("Failed to get WebHCatShim", e);
+ throw new RuntimeException("Failed to get shimLibJars", e);
+ } finally {
+ try {
+ if (ugi != null) {
+ FileSystem.closeAllForUGI(ugi);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to closeAllForUGI", e);
+ }
}
- // Besides the HiveShims jar which is Hadoop version dependent we also
- // always need to include hive shims common jars.
- Path shimCommonJar = new Path(
- TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
- Path shimCommonSecureJar = new Path(
- TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
- Path shimJar = new Path(
- TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
-
- return String.format(
- "%s,%s,%s",
- shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
}
// Storage vars
@@ -263,7 +444,7 @@ public class LauncherDelegator extends TempletonDelegator {
}
/**
* This is called by subclasses when they determined that the sumbmitted job requires
- * metastore access (e.g. Pig job that uses HCatalog). This then determines if
+ * metastore access (e.g. Pig job that uses HCatalog). This then determines if
* secure access is required and causes TempletonControllerJob to set up a delegation token.
* @see TempletonControllerJob
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
index a30ecd1..dfa59f8 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
@@ -19,9 +19,15 @@
package org.apache.hive.hcatalog.templeton;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobStatus;
@@ -31,20 +37,82 @@ import org.apache.hadoop.security.UserGroupInformation;
* List jobs owned by a user.
*/
public class ListDelegator extends TempletonDelegator {
+ private static final Log LOG = LogFactory.getLog(ListDelegator.class);
+ private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute";
+
+ /**
+ * Current thread id used to set in execution threads.
+ */
+ private final String listThreadId = Thread.currentThread().getName();
+
+ /*
+ * Job request executor to list job status requests.
+ */
+ private static JobRequestExecutor<List<JobItemBean>> jobRequest =
+ new JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List,
+ AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT);
+
public ListDelegator(AppConfig appConf) {
super(appConf);
}
- public List<String> run(String user, boolean showall)
+ /*
+ * List status jobs request. If maximum concurrent job list requests are configured then
+ * list request will be executed on a thread from thread pool. If job list request time out
+ * is configured then request execution thread will be interrupted if thread times out and
+ * does no action.
+ */
+ public List<JobItemBean> run(final String user, final boolean showall, final String jobId,
+ final int numRecords, final boolean showDetails)
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException,
+ TimeoutException, ExecutionException, TooManyRequestsException {
+
+ if (jobRequest.isThreadPoolEnabled()) {
+ return jobRequest.execute(getJobListTask(user, showall, jobId,numRecords, showDetails));
+ } else {
+ return listJobs(user, showall, jobId, numRecords, showDetails);
+ }
+ }
+
+ /*
+ * Job callable task for job list operation. Overrides behavior of execute() to list jobs.
+ * No need to override behavior of cleanup() as there is nothing to be done if list jobs
+ * operation is timed out or interrupted.
+ */
+ private JobCallable<List<JobItemBean>> getJobListTask(final String user, final boolean showall,
+ final String jobId, final int numRecords, final boolean showDetails) {
+ return new JobCallable<List<JobItemBean>>() {
+ @Override
+ public List<JobItemBean> execute() throws NotAuthorizedException, BadParam, IOException,
+ InterruptedException {
+ /*
+ * Change the current thread name to include parent thread Id if it is executed
+ * in thread pool. Useful to extract logs specific to a job request and helpful
+ * to debug job issues.
+ */
+ Thread.currentThread().setName(String.format("%s-%s-%s", JOB_LIST_EXECUTE_THREAD_PREFIX,
+ listThreadId, Thread.currentThread().getId()));
+
+ return listJobs(user, showall, jobId, numRecords, showDetails);
+ }
+ };
+ }
+
+ /*
+ * Gets list of job ids and calls getJobStatus to get status for each job id.
+ */
+ public List<JobItemBean> listJobs(String user, boolean showall, String jobId,
+ int numRecords, boolean showDetails)
throws NotAuthorizedException, BadParam, IOException, InterruptedException {
- UserGroupInformation ugi = UgiFactory.getUgi(user);
+ UserGroupInformation ugi = null;
WebHCatJTShim tracker = null;
+ ArrayList<String> ids = new ArrayList<String>();
+
try {
+ ugi = UgiFactory.getUgi(user);
tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
- ArrayList<String> ids = new ArrayList<String>();
-
JobStatus[] jobs = tracker.getAllJobs();
if (jobs != null) {
@@ -54,13 +122,81 @@ public class ListDelegator extends TempletonDelegator {
ids.add(id);
}
}
-
- return ids;
} catch (IllegalStateException e) {
throw new BadParam(e.getMessage());
} finally {
if (tracker != null)
tracker.close();
+ if (ugi != null)
+ FileSystem.closeAllForUGI(ugi);
}
+
+ return getJobStatus(ids, user, showall, jobId, numRecords, showDetails);
+ }
+
+ /*
+ * Returns job status for list of input jobs as a list.
+ */
+ public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, boolean showall,
+ String jobId, int numRecords, boolean showDetails)
+ throws IOException, InterruptedException {
+
+ List<JobItemBean> detailList = new ArrayList<JobItemBean>();
+ int currRecord = 0;
+
+ // Sort the list as requested
+ boolean isAscendingOrder = true;
+ switch (appConf.getListJobsOrder()) {
+ case lexicographicaldesc:
+ Collections.sort(jobIds, Collections.reverseOrder());
+ isAscendingOrder = false;
+ break;
+ case lexicographicalasc:
+ default:
+ Collections.sort(jobIds);
+ break;
+ }
+
+ for (String job : jobIds) {
+ // If numRecords = -1, fetch all records.
+ // Hence skip all the below checks when numRecords = -1.
+ if (numRecords != -1) {
+ // If currRecord >= numRecords, we have already fetched the top #numRecords
+ if (currRecord >= numRecords) {
+ break;
+ }
+ else if (jobId == null || jobId.trim().length() == 0) {
+ currRecord++;
+ }
+ // If the current record needs to be returned based on the
+ // filter conditions specified by the user, increment the counter
+ else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) {
+ currRecord++;
+ }
+ // The current record should not be included in the output detailList.
+ else {
+ continue;
+ }
+ }
+ JobItemBean jobItem = new JobItemBean();
+ jobItem.id = job;
+ if (showDetails) {
+ StatusDelegator sd = new StatusDelegator(appConf);
+ try {
+ jobItem.detail = sd.run(user, job, false);
+ }
+ catch(Exception ex) {
+ /*
+ * if we could not get status for some reason, log it, and send empty status back with
+ * just the ID so that caller knows to even look in the log file
+ */
+ LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
+ jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
+ }
+ }
+ detailList.add(jobItem);
+ }
+
+ return detailList;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
index 5208bf4..3ed3ece 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import org.slf4j.Logger;
@@ -43,14 +44,15 @@ import org.eclipse.jetty.rewrite.handler.RedirectPatternRule;
import org.eclipse.jetty.rewrite.handler.RewriteHandler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.FilterHolder;
-import org.eclipse.jetty.servlet.FilterMapping;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.xml.XmlConfiguration;
import org.slf4j.bridge.SLF4JBridgeHandler;
+import javax.servlet.DispatcherType;
import javax.servlet.http.HttpServletRequest;
/**
@@ -122,7 +124,7 @@ public class Main {
checkEnv();
runServer(port);
// Currently only print the first port to be consistent with old behavior
- port = ArrayUtils.isEmpty(server.getConnectors()) ? -1 : server.getConnectors()[0].getPort();
+ port = ArrayUtils.isEmpty(server.getConnectors()) ? -1 : ((ServerConnector)(server.getConnectors()[0])).getLocalPort();
System.out.println("templeton: listening on port " + port);
LOG.info("Templeton listening on port " + port);
@@ -185,6 +187,7 @@ public class Main {
// Add the Auth filter
FilterHolder fHolder = makeAuthFilter();
+ EnumSet<DispatcherType> dispatches = EnumSet.of(DispatcherType.REQUEST);
/*
* We add filters for each of the URIs supported by templeton.
@@ -193,28 +196,18 @@ public class Main {
* This is because mapreduce does not use secure credentials for
* callbacks. So jetty would fail the request as unauthorized.
*/
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*",
- FilterMapping.REQUEST);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*",
- FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", dispatches);
if (conf.getBoolean(AppConfig.XSRF_FILTER_ENABLED, false)){
- root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*",
- FilterMapping.REQUEST);
+ root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*", dispatches);
LOG.debug("XSRF filter enabled");
} else {
LOG.warn("XSRF filter disabled");