You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:43:32 UTC
[42/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 6f96e1d..f7e3e3a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.RawStoreProxy;
@@ -57,7 +56,6 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
-import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -88,17 +86,23 @@ public class DbNotificationListener extends MetaStoreEventListener {
// HiveConf rather than a Configuration.
private HiveConf hiveConf;
private MessageFactory msgFactory;
-
- private synchronized void init(HiveConf conf) throws MetaException {
- if (cleaner == null) {
- cleaner =
- new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf,
- conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999));
+ private RawStore rs;
+
+ private synchronized void init(HiveConf conf) {
+ try {
+ rs = RawStoreProxy.getProxy(conf, conf,
+ conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999);
+ } catch (MetaException e) {
+ LOG.error("Unable to connect to raw store, notifications will not be tracked", e);
+ rs = null;
+ }
+ if (cleaner == null && rs != null) {
+ cleaner = new CleanerThread(conf, rs);
cleaner.start();
}
}
- public DbNotificationListener(Configuration config) throws MetaException {
+ public DbNotificationListener(Configuration config) {
super(config);
// The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor
// with a Configuration parameter, so we have to declare config as Configuration. But it
@@ -138,7 +142,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event, tableEvent);
+ process(event);
}
/**
@@ -153,7 +157,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event, tableEvent);
+ process(event);
}
/**
@@ -166,10 +170,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
Table after = tableEvent.getNewTable();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
- .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString());
+ .buildAlterTableMessage(before, after).toString());
event.setDbName(after.getDbName());
event.setTableName(after.getTableName());
- process(event, tableEvent);
+ process(event);
}
class FileIterator implements Iterator<String> {
@@ -277,7 +281,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event, partitionEvent);
+ process(event);
}
/**
@@ -292,7 +296,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event, partitionEvent);
+ process(event);
}
/**
@@ -305,10 +309,10 @@ public class DbNotificationListener extends MetaStoreEventListener {
Partition after = partitionEvent.getNewPartition();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
- .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString());
+ .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
event.setDbName(before.getDbName());
event.setTableName(before.getTableName());
- process(event, partitionEvent);
+ process(event);
}
/**
@@ -322,7 +326,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
.buildCreateDatabaseMessage(db).toString());
event.setDbName(db.getName());
- process(event, dbEvent);
+ process(event);
}
/**
@@ -336,7 +340,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
.buildDropDatabaseMessage(db).toString());
event.setDbName(db.getName());
- process(event, dbEvent);
+ process(event);
}
/**
@@ -350,7 +354,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
.buildCreateFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- process(event, fnEvent);
+ process(event);
}
/**
@@ -364,7 +368,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
.buildDropFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- process(event, fnEvent);
+ process(event);
}
/**
@@ -378,7 +382,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
.buildCreateIndexMessage(index).toString());
event.setDbName(index.getDbName());
- process(event, indexEvent);
+ process(event);
}
/**
@@ -392,7 +396,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
.buildDropIndexMessage(index).toString());
event.setDbName(index.getDbName());
- process(event, indexEvent);
+ process(event);
}
/**
@@ -407,7 +411,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
.buildAlterIndexMessage(before, after).toString());
event.setDbName(before.getDbName());
- process(event, indexEvent);
+ process(event);
}
class FileChksumIterator implements Iterator<String> {
@@ -439,12 +443,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
public void onInsert(InsertEvent insertEvent) throws MetaException {
NotificationEvent event =
new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
- insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(),
+ insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()))
.toString());
event.setDbName(insertEvent.getDb());
event.setTableName(insertEvent.getTable());
- process(event, insertEvent);
+ process(event);
}
/**
@@ -468,27 +472,18 @@ public class DbNotificationListener extends MetaStoreEventListener {
return (int)millis;
}
- /**
- * Process this notification by adding it to metastore DB.
- *
- * @param event NotificationEvent is the object written to the metastore DB.
- * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
- * DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
- */
- private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
+ // Process this notification by adding it to metastore DB
+ private void process(NotificationEvent event) {
event.setMessageFormat(msgFactory.getMessageFormat());
- synchronized (NOTIFICATION_TBL_LOCK) {
- LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
- event.getMessage());
- HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
- }
-
- // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
- if (event.isSetEventId()) {
- listenerEvent.putParameter(
- MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
- Long.toString(event.getEventId()));
+ if (rs != null) {
+ synchronized (NOTIFICATION_TBL_LOCK) {
+ LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
+ event.getMessage());
+ rs.addNotificationEvent(event);
}
+ } else {
+ LOG.warn("Dropping event " + event + " since notification is not running.");
+ }
}
private static class CleanerThread extends Thread {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
deleted file mode 100644
index a4f2d59..0000000
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.hcatalog.listener;
-
-/**
- * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent
- * parameters.
- */
-public class MetaStoreEventListenerConstants {
- /*
- * DbNotificationListener keys reserved for updating ListenerEvent parameters.
- *
- * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener
- * processed during an event. This event identifier might be shared
- * across other MetaStoreEventListener implementations.
- */
- public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME";
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index 5bea0a6..e765305 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
deleted file mode 100644
index 78987ab..0000000
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.hcatalog.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.RegexSerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.Text;
-
-/**
- * Streaming Writer handles text input data with regex. Uses
- * org.apache.hadoop.hive.serde2.RegexSerDe
- */
-public class StrictRegexWriter extends AbstractRecordWriter {
- private RegexSerDe serde;
- private final StructObjectInspector recordObjInspector;
- private final ObjectInspector[] bucketObjInspectors;
- private final StructField[] bucketStructFields;
-
- /**
- * @param endPoint the end point to write to
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- this(null, endPoint, null, conn);
- }
-
- /**
- * @param endPoint the end point to write to
- * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- this(null, endPoint, conf, conn);
- }
-
- /**
- * @param regex to parse the data
- * @param endPoint the end point to write to
- * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
- * @param conn connection this Writer is to be used with
- * @throws ConnectionError
- * @throws SerializationError
- * @throws StreamingException
- */
- public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
- throws ConnectionError, SerializationError, StreamingException {
- super(endPoint, conf, conn);
- this.serde = createSerde(tbl, conf, regex);
- // get ObjInspectors for entire record and bucketed cols
- try {
- recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector();
- this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
- } catch (SerDeException e) {
- throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
- }
-
- // get StructFields for bucketed cols
- bucketStructFields = new StructField[bucketIds.size()];
- List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
- for (int i = 0; i < bucketIds.size(); i++) {
- bucketStructFields[i] = allFields.get(bucketIds.get(i));
- }
- }
-
- @Override
- public AbstractSerDe getSerde() {
- return serde;
- }
-
- @Override
- protected StructObjectInspector getRecordObjectInspector() {
- return recordObjInspector;
- }
-
- @Override
- protected StructField[] getBucketStructFields() {
- return bucketStructFields;
- }
-
- @Override
- protected ObjectInspector[] getBucketObjectInspectors() {
- return bucketObjInspectors;
- }
-
-
- @Override
- public void write(long transactionId, byte[] record)
- throws StreamingIOFailure, SerializationError {
- try {
- Object encodedRow = encode(record);
- int bucket = getBucket(encodedRow);
- getRecordUpdater(bucket).insert(transactionId, encodedRow);
- } catch (IOException e) {
- throw new StreamingIOFailure("Error writing record in transaction("
- + transactionId + ")", e);
- }
- }
-
- /**
- * Creates RegexSerDe
- * @param tbl used to create serde
- * @param conf used to create serde
- * @param regex used to create serde
- * @return
- * @throws SerializationError if serde could not be initialized
- */
- private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
- throws SerializationError {
- try {
- Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
- tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
- ArrayList<String> tableColumns = getCols(tbl);
- tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
- RegexSerDe serde = new RegexSerDe();
- SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
- return serde;
- } catch (SerDeException e) {
- throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
- }
- }
-
- private static ArrayList<String> getCols(Table table) {
- List<FieldSchema> cols = table.getSd().getCols();
- ArrayList<String> colNames = new ArrayList<String>(cols.size());
- for (FieldSchema col : cols) {
- colNames.add(col.getName().toLowerCase());
- }
- return colNames;
- }
-
- /**
- * Encode Utf8 encoded string bytes using RegexSerDe
- *
- * @param utf8StrRecord
- * @return The encoded object
- * @throws SerializationError
- */
- @Override
- public Object encode(byte[] utf8StrRecord) throws SerializationError {
- try {
- Text blob = new Text(utf8StrRecord);
- return serde.deserialize(blob);
- } catch (SerDeException e) {
- throw new SerializationError("Unable to convert byte[] record into Object", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 097de9b..bf29993 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -64,6 +64,10 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -78,15 +82,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
-import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
@@ -485,9 +485,9 @@ public class TestStreaming {
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
- for (String record : records) {
+ for (int i = 0; i < records.length; i++) {
Assert.assertEquals(true, rr.next(key, value));
- Assert.assertEquals(record, value.toString());
+ Assert.assertEquals(records[i], value.toString());
}
Assert.assertEquals(false, rr.next(key, value));
}
@@ -741,7 +741,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -787,75 +787,6 @@ public class TestStreaming {
}
@Test
- public void testTransactionBatchCommit_Regex() throws Exception {
- testTransactionBatchCommit_Regex(null);
- }
- @Test
- public void testTransactionBatchCommit_RegexUGI() throws Exception {
- testTransactionBatchCommit_Regex(Utils.getUGI());
- }
- private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
- HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
- partitionVals);
- StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
- String regex = "([^,]*),(.*)";
- StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
-
- // 1st Txn
- TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
- txnBatch.beginNextTransaction();
- Assert.assertEquals(TransactionBatch.TxnState.OPEN
- , txnBatch.getCurrentTransactionState());
- txnBatch.write("1,Hello streaming".getBytes());
- txnBatch.commit();
-
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
-
- Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
- , txnBatch.getCurrentTransactionState());
-
- // 2nd Txn
- txnBatch.beginNextTransaction();
- Assert.assertEquals(TransactionBatch.TxnState.OPEN
- , txnBatch.getCurrentTransactionState());
- txnBatch.write("2,Welcome to streaming".getBytes());
-
- // data should not be visible
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
-
- txnBatch.commit();
-
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
-
- txnBatch.close();
- Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
- , txnBatch.getCurrentTransactionState());
-
-
- connection.close();
-
-
- // To Unpartitioned table
- endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
- connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
- regex = "([^:]*):(.*)";
- writer = new StrictRegexWriter(regex, endPt, conf, connection);
-
- // 1st Txn
- txnBatch = connection.fetchTransactionBatch(10, writer);
- txnBatch.beginNextTransaction();
- Assert.assertEquals(TransactionBatch.TxnState.OPEN
- , txnBatch.getCurrentTransactionState());
- txnBatch.write("1:Hello streaming".getBytes());
- txnBatch.commit();
-
- Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
- , txnBatch.getCurrentTransactionState());
- connection.close();
- }
-
- @Test
public void testTransactionBatchCommit_Json() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
partitionVals);
@@ -871,7 +802,7 @@ public class TestStreaming {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -998,7 +929,7 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1017,13 +948,13 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1034,14 +965,14 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1078,11 +1009,11 @@ public class TestStreaming {
txnBatch2.commit();
- checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1090,17 +1021,17 @@ public class TestStreaming {
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}");
txnBatch2.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1769,7 +1700,7 @@ public class TestStreaming {
txnBatch.heartbeat();//this is no-op on closed batch
txnBatch.abort();//ditto
GetOpenTxnsInfoResponse r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1833,7 +1764,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1856,7 +1787,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/java-client/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml
index 3bb9f4d..3b53664 100644
--- a/hcatalog/webhcat/java-client/pom.xml
+++ b/hcatalog/webhcat/java-client/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
index 86d3acb..b9cb067 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
@@ -434,7 +434,7 @@ public class TestHCatClient {
HCatClient client = HCatClient.create(new Configuration(hcatConf));
boolean isExceptionCaught = false;
// Table creation with a long table name causes ConnectionFailureException
- final String tableName = "Temptable" + new BigInteger(260, new Random()).toString(2);
+ final String tableName = "Temptable" + new BigInteger(200, new Random()).toString(2);
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", Type.INT, "id columns"));
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml
index a55ffe9..c5ad387 100644
--- a/hcatalog/webhcat/svr/pom.xml
+++ b/hcatalog/webhcat/svr/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -45,50 +45,9 @@
<artifactId>hive-hcatalog-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-runner</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-sslengine</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!-- inter-project -->
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-rewrite</artifactId>
- <version>${jetty.version}</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- <version>${jetty.version}</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <version>${jetty.version}</version>
- </dependency>
- <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>${jersey.version}</version>
@@ -134,6 +93,11 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all-server</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
@@ -143,7 +107,7 @@
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<exclusions>
- <exclusion>
+ <exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
@@ -157,42 +121,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!-- test inter-project -->
<dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 0ea7d88..54d0907 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -111,43 +111,6 @@ public class AppConfig extends Configuration {
public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb";
public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder";
- /*
- * These parameters controls the maximum number of concurrent job submit/status/list
- * operations in templeton service. If more number of concurrent requests comes then
- * they will be rejected with BusyException.
- */
- public static final String JOB_SUBMIT_MAX_THREADS = "templeton.parallellism.job.submit";
- public static final String JOB_STATUS_MAX_THREADS = "templeton.parallellism.job.status";
- public static final String JOB_LIST_MAX_THREADS = "templeton.parallellism.job.list";
-
- /*
- * These parameters controls the maximum time job submit/status/list operation is
- * executed in templeton service. On time out, the execution is interrupted and
- * TimeoutException is returned to client. On time out
- * For list and status operation, there is no action needed as they are read requests.
- * For submit operation, we do best effort to kill the job if its generated. Enabling
- * this parameter may have following side effects
- * 1) There is a possibility for having active job for some time when the client gets
- * response for submit operation and a list operation from client could potential
- * show the newly created job which may eventually be killed with no guarantees.
- * 2) If submit operation retried by client then there is a possibility of duplicate
- * jobs triggered.
- *
- * Time out configs should be configured in seconds.
- *
- */
- public static final String JOB_SUBMIT_TIMEOUT = "templeton.job.submit.timeout";
- public static final String JOB_STATUS_TIMEOUT = "templeton.job.status.timeout";
- public static final String JOB_LIST_TIMEOUT = "templeton.job.list.timeout";
-
- /*
- * If task execution time out is configured for submit operation then job may need to
- * be killed on execution time out. These parameters controls the maximum number of
- * retries and retry wait time in seconds for executing the time out task.
- */
- public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = "templeton.job.timeout.task.retry.count";
- public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = "templeton.job.timeout.task.retry.interval";
-
/**
* see webhcat-default.xml
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
index 622f92d..4b2dfec 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.security.UserGroupInformation;
@@ -42,11 +41,10 @@ public class DeleteDelegator extends TempletonDelegator {
public QueueStatusBean run(String user, String id)
throws NotAuthorizedException, BadParam, IOException, InterruptedException
{
- UserGroupInformation ugi = null;
+ UserGroupInformation ugi = UgiFactory.getUgi(user);
WebHCatJTShim tracker = null;
JobState state = null;
try {
- ugi = UgiFactory.getUgi(user);
tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
JobID jobid = StatusDelegator.StringToJobID(id);
if (jobid == null)
@@ -71,8 +69,6 @@ public class DeleteDelegator extends TempletonDelegator {
tracker.close();
if (state != null)
state.close();
- if (ugi != null)
- FileSystem.closeAllForUGI(ugi);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index 1953028..f0296cb 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator {
String statusdir, String callback, String completedUrl, boolean enablelog,
Boolean enableJobReconnect)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException
+ ExecuteException, IOException, InterruptedException
{
runAs = user;
List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir,
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index 1246b40..84cd5b9 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator {
boolean usesHcatalog, String completedUrl,
boolean enablelog, Boolean enableJobReconnect, JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
runAs = user;
List<String> args = makeArgs(jar, mainClass,
libjars, files, jarArgs, defines,
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
deleted file mode 100644
index e703eff..0000000
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.util.concurrent.Callable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class JobCallable<T> implements Callable<T> {
- private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class);
-
- static public enum JobState {
- STARTED,
- FAILED,
- COMPLETED
- }
-
- /*
- * Job state of job request. Changes to the state are synchronized using
- * setStateAndResult. This is required due to two different threads,
- * main thread and job execute thread, tries to change state and organize
- * clean up tasks.
- */
- private JobState jobState = JobState.STARTED;
-
- /*
- * Result of JobCallable task after successful task completion. This is
- * expected to be set by the thread which executes JobCallable task.
- */
- public T returnResult = null;
-
- /*
- * Sets the job state to FAILED. Returns true if FAILED status is set.
- * Otherwise, it returns false.
- */
- public boolean setJobStateFailed() {
- return setStateAndResult(JobState.FAILED, null);
- }
-
- /*
- * Sets the job state to COMPLETED and also sets the results value. Returns true
- * if COMPLETED status is set. Otherwise, it returns false.
- */
- public boolean setJobStateCompleted(T result) {
- return setStateAndResult(JobState.COMPLETED, result);
- }
-
- /*
- * Sets the job state and result. Returns true if status and result are set.
- * Otherwise, it returns false.
- */
- private synchronized boolean setStateAndResult(JobState jobState, T result) {
- if (this.jobState == JobState.STARTED) {
- this.jobState = jobState;
- this.returnResult = result;
- return true;
- } else {
- LOG.info("Failed to set job state to " + jobState + " due to job state "
- + this.jobState + ". Expected state is " + JobState.STARTED);
- }
-
- return false;
- }
-
- /*
- * Executes the callable task with help of execute() call and gets the result
- * of the task. It also sets job status as COMPLETED if state is not already
- * set to FAILED and returns result to future.
- */
- public T call() throws Exception {
-
- /*
- * Don't catch any execution exceptions here and let the caller catch it.
- */
- T result = this.execute();
-
- if (!this.setJobStateCompleted(result)) {
- /*
- * Failed to set job status as COMPLETED which mean the main thread would have
- * exited and not waiting for the result. Call cleanup() to execute any cleanup.
- */
- cleanup();
- return null;
- }
-
- return this.returnResult;
- }
-
- /*
- * Abstract method to be overridden for task execution.
- */
- public abstract T execute() throws Exception;
-
- /*
- * Cleanup method called to run cleanup tasks if job state is FAILED. By default,
- * no cleanup is provided.
- */
- public void cleanup() {}
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
deleted file mode 100644
index 9ac4588..0000000
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JobRequestExecutor<T> {
- private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class);
- private static AppConfig appConf = Main.getAppConfigInstance();
-
- /*
- * Thread pool to execute job requests.
- */
- private ThreadPoolExecutor jobExecutePool = null;
-
- /*
- * Type of job request.
- */
- private JobRequestType requestType;
-
- /*
- * Config name used to find the number of concurrent requests.
- */
- private String concurrentRequestsConfigName;
-
- /*
- * Config name used to find the maximum time job request can be executed.
- */
- private String jobTimeoutConfigName;
-
- /*
- * Job request execution time out in seconds. If it is 0 then request
- * will not be timed out.
- */
- private int requestExecutionTimeoutInSec = 0;
-
- /*
- * Amount of time a thread can be alive in thread pool before cleaning this up. Core threads
- * will not be cleanup from thread pool.
- */
- private int threadKeepAliveTimeInHours = 1;
-
- /*
- * Maximum number of times a cancel request is sent to job request execution
- * thread. Future.cancel may not be able to interrupt the thread if it is
- * blocked on network calls.
- */
- private int maxTaskCancelRetryCount = 10;
-
- /*
- * Wait time in milliseconds before another cancel request is made.
- */
- private int maxTaskCancelRetryWaitTimeInMs = 1000;
-
- /*
- * A flag to indicate whether to cancel the task when exception TimeoutException or
- * InterruptedException or CancellationException raised. The default is cancel thread.
- */
- private boolean enableCancelTask = true;
-
- /*
- * Job Request type.
- */
- public enum JobRequestType {
- Submit,
- Status,
- List
- }
-
- /*
- * Creates a job request object and sets up execution environment. Creates a thread pool
- * to execute job requests.
- *
- * @param requestType
- * Job request type
- *
- * @param concurrentRequestsConfigName
- * Config name to be used to extract number of concurrent requests to be serviced.
- *
- * @param jobTimeoutConfigName
- * Config name to be used to extract maximum time a task can execute a request.
- *
- * @param enableCancelTask
- * A flag to indicate whether to cancel the task when exception TimeoutException
- * or InterruptedException or CancellationException raised.
- *
- */
- public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
- String jobTimeoutConfigName, boolean enableCancelTask) {
-
- this.concurrentRequestsConfigName = concurrentRequestsConfigName;
- this.jobTimeoutConfigName = jobTimeoutConfigName;
- this.requestType = requestType;
- this.enableCancelTask = enableCancelTask;
-
- /*
- * The default number of threads will be 0. That means thread pool is not used and
- * operation is executed with the current thread.
- */
- int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ?
- appConf.getInt(concurrentRequestsConfigName, 0) : 0;
-
- if (threads > 0) {
- /*
- * Create a thread pool with no queue wait time to execute the operation. This will ensure
- * that job requests are rejected if there are already maximum number of threads busy.
- */
- this.jobExecutePool = new ThreadPoolExecutor(threads, threads,
- threadKeepAliveTimeInHours, TimeUnit.HOURS,
- new SynchronousQueue<Runnable>());
- this.jobExecutePool.allowCoreThreadTimeOut(true);
-
- /*
- * Get the job request time out value. If this configuration value is set to 0
- * then job request will wait until it finishes.
- */
- if (!StringUtils.isEmpty(jobTimeoutConfigName)) {
- this.requestExecutionTimeoutInSec = appConf.getInt(jobTimeoutConfigName, 0);
- }
-
- LOG.info("Configured " + threads + " threads for job request type " + this.requestType
- + " with time out " + this.requestExecutionTimeoutInSec + " s.");
- } else {
- /*
- * If threads are not configured then they will be executed in current thread itself.
- */
- LOG.info("No thread pool configured for job request type " + this.requestType);
- }
- }
-
- /*
- * Creates a job request object and sets up execution environment. Creates a thread pool
- * to execute job requests.
- *
- * @param requestType
- * Job request type
- *
- * @param concurrentRequestsConfigName
- * Config name to be used to extract number of concurrent requests to be serviced.
- *
- * @param jobTimeoutConfigName
- * Config name to be used to extract maximum time a task can execute a request.
- *
- */
- public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName,
- String jobTimeoutConfigName) {
- this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, true);
- }
-
- /*
- * Returns true of thread pool is created and can be used for executing a job request.
- * Otherwise, returns false.
- */
- public boolean isThreadPoolEnabled() {
- return this.jobExecutePool != null;
- }
-
- /*
- * Executes job request operation. If thread pool is not created then job request is
- * executed in current thread itself.
- *
- * @param jobExecuteCallable
- * Callable object to run the job request task.
- *
- */
- public T execute(JobCallable<T> jobExecuteCallable) throws InterruptedException,
- TimeoutException, TooManyRequestsException, ExecutionException {
- /*
- * The callable shouldn't be null to execute. The thread pool also should be configured
- * to execute requests.
- */
- assert (jobExecuteCallable != null);
- assert (this.jobExecutePool != null);
-
- String type = this.requestType.toString().toLowerCase();
-
- String retryMessageForConcurrentRequests = "Please wait for some time before retrying "
- + "the operation. Please refer to the config " + concurrentRequestsConfigName
- + " to configure concurrent requests.";
-
- LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeoutInSec
- + "seconds.");
- Future<T> future = null;
-
- try {
- future = this.jobExecutePool.submit(jobExecuteCallable);
- } catch (RejectedExecutionException rejectedException) {
- /*
- * Not able to find thread to execute the job request. Raise Busy exception and client
- * can retry the operation.
- */
- String tooManyRequestsExceptionMessage = "Unable to service the " + type + " job request as "
- + "templeton service is busy with too many " + type + " job requests. "
- + retryMessageForConcurrentRequests;
-
- LOG.warn(tooManyRequestsExceptionMessage);
- throw new TooManyRequestsException(tooManyRequestsExceptionMessage);
- }
-
- T result = null;
-
- try {
- result = this.requestExecutionTimeoutInSec > 0
- ? future.get(this.requestExecutionTimeoutInSec, TimeUnit.SECONDS) : future.get();
- } catch (TimeoutException e) {
- /*
- * See if the execution thread has just completed operation and result is available.
- * If result is available then return the result. Otherwise, raise exception.
- */
- if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
- String message = this.requestType + " job request got timed out. Please wait for some time "
- + "before retrying the operation. Please refer to the config "
- + jobTimeoutConfigName + " to configure job request time out.";
- LOG.warn(message);
-
- /*
- * Throw TimeoutException to caller.
- */
- throw new TimeoutException(message);
- }
- } catch (InterruptedException e) {
- /*
- * See if the execution thread has just completed operation and result is available.
- * If result is available then return the result. Otherwise, raise exception.
- */
- if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
- String message = this.requestType + " job request got interrupted. Please wait for some time "
- + "before retrying the operation.";
- LOG.warn(message);
-
- /*
- * Throw TimeoutException to caller.
- */
- throw new InterruptedException(message);
- }
- } catch (CancellationException e) {
- /*
- * See if the execution thread has just completed operation and result is available.
- * If result is available then return the result. Otherwise, raise exception.
- */
- if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) {
- String message = this.requestType + " job request got cancelled and thread got interrupted. "
- + "Please wait for some time before retrying the operation.";
- LOG.warn(message);
-
- throw new InterruptedException(message);
- }
- } finally {
- /*
- * If the thread is still active and needs to be cancelled then cancel it. This may
- * happen in case task got interrupted, or timed out.
- */
- if (enableCancelTask) {
- cancelExecutePoolThread(future);
- }
- }
-
- LOG.debug("Completed " + type + " job request.");
-
- return result;
- }
-
- /*
- * Initiate cancel request to cancel the thread execution and interrupt the thread.
- * If thread interruption is not handled by jobExecuteCallable then thread may continue
- * running to completion. The cancel call may fail for some scenarios. In that case,
- * retry the cancel call until it returns true or max retry count is reached.
- *
- * @param future
- * Future object which has handle to cancel the thread.
- *
- */
- private void cancelExecutePoolThread(Future<T> future) {
- int retryCount = 0;
- while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) {
- LOG.info("Task is still executing the job request. Cancelling it with retry count: "
- + retryCount);
- if (future.cancel(true)) {
- /*
- * Cancelled the job request and return to client.
- */
- LOG.info("Cancel job request issued successfully.");
- return;
- }
-
- retryCount++;
- try {
- Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs);
- } catch (InterruptedException e) {
- /*
- * Nothing to do. Just retry.
- */
- }
- }
-
- LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled()
- + " Retry count: " + retryCount);
- }
-
- /*
- * Tries to get the job result if job request is completed. Otherwise it sets job status
- * to FAILED such that execute thread can do necessary clean up based on FAILED state.
- */
- private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> jobExecuteCallable) {
- if (!jobExecuteCallable.setJobStateFailed()) {
- LOG.info("Job is already COMPLETED. Returning the result.");
- return jobExecuteCallable.returnResult;
- } else {
- LOG.info("Job status set to FAILED. Job clean up to be done by execute thread "
- + "after job request is executed.");
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index 9bea897..b3f44a2 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -23,19 +23,16 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.templeton.tool.JobState;
import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
@@ -53,26 +50,9 @@ public class LauncherDelegator extends TempletonDelegator {
static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP}
private boolean secureMeatastoreAccess = false;
private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*";
- private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute";
- private final int jobTimeoutTaskRetryCount;
- private final int jobTimeoutTaskRetryIntervalInSec;
-
- /**
- * Current thread used to set in execution threads.
- */
- private final String submitThreadId = Thread.currentThread().getName();
-
- /**
- * Job request executor to submit job requests.
- */
- private static JobRequestExecutor<EnqueueBean> jobRequest =
- new JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit,
- AppConfig.JOB_SUBMIT_MAX_THREADS, AppConfig.JOB_SUBMIT_TIMEOUT, false);
public LauncherDelegator(AppConfig appConf) {
super(appConf);
- jobTimeoutTaskRetryCount = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0);
- jobTimeoutTaskRetryIntervalInSec = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0);
}
public void registerJob(String id, String user, String callback,
@@ -90,102 +70,19 @@ public class LauncherDelegator extends TempletonDelegator {
}
}
- /*
- * Submit job request. If maximum concurrent job submit requests are configured then submit
- * request will be executed on a thread from thread pool. If job submit request time out is
- * configured then request execution thread will be interrupted if thread times out. Also
- * does best efforts to identify if job is submitted and kill it quietly.
- */
- public EnqueueBean enqueueController(final String user, final Map<String, Object> userArgs,
- final String callback, final List<String> args)
- throws NotAuthorizedException, BusyException, IOException, QueueException, TooManyRequestsException {
-
- EnqueueBean bean = null;
- final TempletonControllerJob controllerJob = getTempletonController();
-
- if (jobRequest.isThreadPoolEnabled()) {
- JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, userArgs, callback,
- args, controllerJob);
- try {
- bean = jobRequest.execute(jobExecuteCallable);
- } catch (TimeoutException ex) {
- /*
- * Job request got timed out. Job kill should have started. Return to client with
- * QueueException.
- */
- throw new QueueException(ex.getMessage());
- } catch (InterruptedException ex) {
- /*
- * Job request got interrupted. Job kill should have started. Return to client with
- * with QueueException.
- */
- throw new QueueException(ex.getMessage());
- } catch (ExecutionException ex) {
- /*
- * ExecutionException is raised if job execution gets an exception. Return to client
- * with the exception.
- */
- throw new QueueException(ex.getMessage());
- }
- } else {
- LOG.info("No thread pool configured for submit job request. Executing "
- + "the job request in current thread.");
-
- bean = enqueueJob(user, userArgs, callback, args, controllerJob);
- }
-
- return bean;
- }
-
- /*
- * Job callable task for job submit operation. Overrides behavior of execute()
- * to submit job. Also, overrides the behavior of cleanup() to kill the job in case
- * job submission request is timed out or interrupted.
- */
- private JobCallable<EnqueueBean> getJobSubmitTask(final String user,
- final Map<String, Object> userArgs, final String callback,
- final List<String> args, final TempletonControllerJob controllerJob) {
- return new JobCallable<EnqueueBean>() {
- @Override
- public EnqueueBean execute() throws NotAuthorizedException, BusyException, IOException,
- QueueException {
- /*
- * Change the current thread name to include parent thread Id if it is executed
- * in thread pool. Useful to extract logs specific to a job request and helpful
- * to debug job issues.
- */
- Thread.currentThread().setName(String.format("%s-%s-%s", JOB_SUBMIT_EXECUTE_THREAD_PREFIX,
- submitThreadId, Thread.currentThread().getId()));
-
- return enqueueJob(user, userArgs, callback, args, controllerJob);
- }
-
- @Override
- public void cleanup() {
- /*
- * Failed to set job status as COMPLETED which mean the main thread would have
- * exited and not waiting for the result. Kill the submitted job.
- */
- LOG.info("Job kill not done by main thread. Trying to kill now.");
- killTempletonJobWithRetry(user, controllerJob.getSubmittedId());
- }
- };
- }
-
/**
* Enqueue the TempletonControllerJob directly calling doAs.
*/
- public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, String callback,
- List<String> args, TempletonControllerJob controllerJob)
+ public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback,
+ List<String> args)
throws NotAuthorizedException, BusyException,
IOException, QueueException {
- UserGroupInformation ugi = null;
try {
- ugi = UgiFactory.getUgi(user);
+ UserGroupInformation ugi = UgiFactory.getUgi(user);
final long startTime = System.nanoTime();
- String id = queueAsUser(ugi, args, controllerJob);
+ String id = queueAsUser(ugi, args);
long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6));
LOG.debug("queued job " + id + " in " + elapsed + " ms");
@@ -199,91 +96,24 @@ public class LauncherDelegator extends TempletonDelegator {
return new EnqueueBean(id);
} catch (InterruptedException e) {
throw new QueueException("Unable to launch job " + e);
- } finally {
- if (ugi != null) {
- FileSystem.closeAllForUGI(ugi);
- }
}
}
- private String queueAsUser(UserGroupInformation ugi, final List<String> args,
- final TempletonControllerJob controllerJob)
+ private String queueAsUser(UserGroupInformation ugi, final List<String> args)
throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Launching job: " + args);
}
return ugi.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws Exception {
- runTempletonControllerJob(controllerJob, args);
- return controllerJob.getSubmittedId();
+ String[] array = new String[args.size()];
+ TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf);
+ ToolRunner.run(ctrl, args.toArray(array));
+ return ctrl.getSubmittedId();
}
});
}
- /*
- * Kills templeton job with multiple retries if job exists. Returns true if kill job
- * attempt is success. Otherwise returns false.
- */
- private boolean killTempletonJobWithRetry(String user, String jobId) {
- /*
- * Make null safe Check if the job submission has gone through and if job is valid.
- */
- if (StringUtils.startsWith(jobId, "job_")) {
- LOG.info("Started killing the job " + jobId);
-
- boolean success = false;
- int count = 0;
- do {
- try {
- count++;
- killJob(user, jobId);
- success = true;
- LOG.info("Kill job attempt succeeded.");
- } catch (Exception e) {
- LOG.info("Failed to kill the job due to exception: " + e.getMessage());
- LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s before retrying "
- + "the operation. Iteration: " + count);
- try {
- Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000);
- } catch (InterruptedException ex) {
- LOG.info("Got interrupted while waiting for next retry.");
- }
- }
- } while (!success && count < jobTimeoutTaskRetryCount);
-
- return success;
- } else {
- LOG.info("Couldn't find a valid job id after job request is timed out.");
- return false;
- }
- }
-
- /*
- * Gets new templeton controller objects.
- */
- protected TempletonControllerJob getTempletonController() {
- return new TempletonControllerJob(secureMeatastoreAccess, appConf);
- }
-
- /*
- * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run
- * the actual job.
- */
- protected int runTempletonControllerJob(TempletonControllerJob controllerJob, List<String> args)
- throws IOException, InterruptedException, TimeoutException, Exception {
- String[] array = new String[args.size()];
- return ToolRunner.run(controllerJob, args.toArray(array));
- }
-
- /*
- * Uses DeleteDelegator to kill a job and ignores all exceptions.
- */
- protected void killJob(String user, String jobId)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException {
- DeleteDelegator d = new DeleteDelegator(appConf);
- d.run(user, jobId);
- }
-
public List<String> makeLauncherArgs(AppConfig appConf, String statusdir,
String completedUrl,
List<String> copyFiles,
@@ -350,35 +180,24 @@ public class LauncherDelegator extends TempletonDelegator {
*/
private String getShimLibjars() {
WebHCatJTShim shim = null;
- UserGroupInformation ugi = null;
try {
- ugi = UserGroupInformation.getCurrentUser();
- shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
-
- // Besides the HiveShims jar which is Hadoop version dependent we also
- // always need to include hive shims common jars.
- Path shimCommonJar = new Path(
- TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
- Path shimCommonSecureJar = new Path(
- TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
- Path shimJar = new Path(
- TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
-
- return String.format(
- "%s,%s,%s",
- shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
+ shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, UserGroupInformation.getCurrentUser());
} catch (IOException e) {
- throw new RuntimeException("Failed to get shimLibJars", e);
- } finally {
- try {
- if (ugi != null) {
- FileSystem.closeAllForUGI(ugi);
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to closeAllForUGI", e);
- }
+ throw new RuntimeException("Failed to get WebHCatShim", e);
}
+ // Besides the HiveShims jar which is Hadoop version dependent we also
+ // always need to include hive shims common jars.
+ Path shimCommonJar = new Path(
+ TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN));
+ Path shimCommonSecureJar = new Path(
+ TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN));
+ Path shimJar = new Path(
+ TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN));
+
+ return String.format(
+ "%s,%s,%s",
+ shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString());
}
// Storage vars
@@ -444,7 +263,7 @@ public class LauncherDelegator extends TempletonDelegator {
}
/**
* This is called by subclasses when they determined that the sumbmitted job requires
- * metastore access (e.g. Pig job that uses HCatalog). This then determines if
+ * metastore access (e.g. Pig job that uses HCatalog). This then determines if
* secure access is required and causes TempletonControllerJob to set up a delegation token.
* @see TempletonControllerJob
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
index dfa59f8..a30ecd1 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
@@ -19,15 +19,9 @@
package org.apache.hive.hcatalog.templeton;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobStatus;
@@ -37,82 +31,20 @@ import org.apache.hadoop.security.UserGroupInformation;
* List jobs owned by a user.
*/
public class ListDelegator extends TempletonDelegator {
- private static final Log LOG = LogFactory.getLog(ListDelegator.class);
- private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute";
-
- /**
- * Current thread id used to set in execution threads.
- */
- private final String listThreadId = Thread.currentThread().getName();
-
- /*
- * Job request executor to list job status requests.
- */
- private static JobRequestExecutor<List<JobItemBean>> jobRequest =
- new JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List,
- AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT);
-
public ListDelegator(AppConfig appConf) {
super(appConf);
}
- /*
- * List status jobs request. If maximum concurrent job list requests are configured then
- * list request will be executed on a thread from thread pool. If job list request time out
- * is configured then request execution thread will be interrupted if thread times out and
- * does no action.
- */
- public List<JobItemBean> run(final String user, final boolean showall, final String jobId,
- final int numRecords, final boolean showDetails)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException,
- TimeoutException, ExecutionException, TooManyRequestsException {
-
- if (jobRequest.isThreadPoolEnabled()) {
- return jobRequest.execute(getJobListTask(user, showall, jobId,numRecords, showDetails));
- } else {
- return listJobs(user, showall, jobId, numRecords, showDetails);
- }
- }
-
- /*
- * Job callable task for job list operation. Overrides behavior of execute() to list jobs.
- * No need to override behavior of cleanup() as there is nothing to be done if list jobs
- * operation is timed out or interrupted.
- */
- private JobCallable<List<JobItemBean>> getJobListTask(final String user, final boolean showall,
- final String jobId, final int numRecords, final boolean showDetails) {
- return new JobCallable<List<JobItemBean>>() {
- @Override
- public List<JobItemBean> execute() throws NotAuthorizedException, BadParam, IOException,
- InterruptedException {
- /*
- * Change the current thread name to include parent thread Id if it is executed
- * in thread pool. Useful to extract logs specific to a job request and helpful
- * to debug job issues.
- */
- Thread.currentThread().setName(String.format("%s-%s-%s", JOB_LIST_EXECUTE_THREAD_PREFIX,
- listThreadId, Thread.currentThread().getId()));
-
- return listJobs(user, showall, jobId, numRecords, showDetails);
- }
- };
- }
-
- /*
- * Gets list of job ids and calls getJobStatus to get status for each job id.
- */
- public List<JobItemBean> listJobs(String user, boolean showall, String jobId,
- int numRecords, boolean showDetails)
+ public List<String> run(String user, boolean showall)
throws NotAuthorizedException, BadParam, IOException, InterruptedException {
- UserGroupInformation ugi = null;
+ UserGroupInformation ugi = UgiFactory.getUgi(user);
WebHCatJTShim tracker = null;
- ArrayList<String> ids = new ArrayList<String>();
-
try {
- ugi = UgiFactory.getUgi(user);
tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
+ ArrayList<String> ids = new ArrayList<String>();
+
JobStatus[] jobs = tracker.getAllJobs();
if (jobs != null) {
@@ -122,81 +54,13 @@ public class ListDelegator extends TempletonDelegator {
ids.add(id);
}
}
+
+ return ids;
} catch (IllegalStateException e) {
throw new BadParam(e.getMessage());
} finally {
if (tracker != null)
tracker.close();
- if (ugi != null)
- FileSystem.closeAllForUGI(ugi);
}
-
- return getJobStatus(ids, user, showall, jobId, numRecords, showDetails);
- }
-
- /*
- * Returns job status for list of input jobs as a list.
- */
- public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, boolean showall,
- String jobId, int numRecords, boolean showDetails)
- throws IOException, InterruptedException {
-
- List<JobItemBean> detailList = new ArrayList<JobItemBean>();
- int currRecord = 0;
-
- // Sort the list as requested
- boolean isAscendingOrder = true;
- switch (appConf.getListJobsOrder()) {
- case lexicographicaldesc:
- Collections.sort(jobIds, Collections.reverseOrder());
- isAscendingOrder = false;
- break;
- case lexicographicalasc:
- default:
- Collections.sort(jobIds);
- break;
- }
-
- for (String job : jobIds) {
- // If numRecords = -1, fetch all records.
- // Hence skip all the below checks when numRecords = -1.
- if (numRecords != -1) {
- // If currRecord >= numRecords, we have already fetched the top #numRecords
- if (currRecord >= numRecords) {
- break;
- }
- else if (jobId == null || jobId.trim().length() == 0) {
- currRecord++;
- }
- // If the current record needs to be returned based on the
- // filter conditions specified by the user, increment the counter
- else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) {
- currRecord++;
- }
- // The current record should not be included in the output detailList.
- else {
- continue;
- }
- }
- JobItemBean jobItem = new JobItemBean();
- jobItem.id = job;
- if (showDetails) {
- StatusDelegator sd = new StatusDelegator(appConf);
- try {
- jobItem.detail = sd.run(user, job, false);
- }
- catch(Exception ex) {
- /*
- * if we could not get status for some reason, log it, and send empty status back with
- * just the ID so that caller knows to even look in the log file
- */
- LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
- jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
- }
- }
- detailList.add(jobItem);
- }
-
- return detailList;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
index 3ed3ece..5208bf4 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
@@ -25,7 +25,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.HashMap;
import org.slf4j.Logger;
@@ -44,15 +43,14 @@ import org.eclipse.jetty.rewrite.handler.RedirectPatternRule;
import org.eclipse.jetty.rewrite.handler.RewriteHandler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.FilterMapping;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.xml.XmlConfiguration;
import org.slf4j.bridge.SLF4JBridgeHandler;
-import javax.servlet.DispatcherType;
import javax.servlet.http.HttpServletRequest;
/**
@@ -124,7 +122,7 @@ public class Main {
checkEnv();
runServer(port);
// Currently only print the first port to be consistent with old behavior
- port = ArrayUtils.isEmpty(server.getConnectors()) ? -1 : ((ServerConnector)(server.getConnectors()[0])).getLocalPort();
+ port = ArrayUtils.isEmpty(server.getConnectors()) ? -1 : server.getConnectors()[0].getPort();
System.out.println("templeton: listening on port " + port);
LOG.info("Templeton listening on port " + port);
@@ -187,7 +185,6 @@ public class Main {
// Add the Auth filter
FilterHolder fHolder = makeAuthFilter();
- EnumSet<DispatcherType> dispatches = EnumSet.of(DispatcherType.REQUEST);
/*
* We add filters for each of the URIs supported by templeton.
@@ -196,18 +193,28 @@ public class Main {
* This is because mapreduce does not use secure credentials for
* callbacks. So jetty would fail the request as unauthorized.
*/
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", dispatches);
- root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", dispatches);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*",
+ FilterMapping.REQUEST);
+ root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*",
+ FilterMapping.REQUEST);
if (conf.getBoolean(AppConfig.XSRF_FILTER_ENABLED, false)){
- root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*", dispatches);
+ root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*",
+ FilterMapping.REQUEST);
LOG.debug("XSRF filter enabled");
} else {
LOG.warn("XSRF filter disabled");