You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/08/28 05:15:19 UTC
svn commit: r1621031 [10/10] - in /hive/branches/cbo: ./
common/src/java/org/apache/hadoop/hive/conf/
contrib/src/test/results/clientnegative/
contrib/src/test/results/clientpositive/ data/files/
hbase-handler/src/test/results/negative/ hcatalog/core/s...
Modified: hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h (original)
+++ hive/branches/cbo/service/src/gen/thrift/gen-cpp/TCLIService_types.h Thu Aug 28 03:15:13 2014
@@ -3602,14 +3602,18 @@ class TGetResultSetMetadataResp {
void swap(TGetResultSetMetadataResp &a, TGetResultSetMetadataResp &b);
+typedef struct _TFetchResultsReq__isset {
+ _TFetchResultsReq__isset() : fetchType(true) {}
+ bool fetchType;
+} _TFetchResultsReq__isset;
class TFetchResultsReq {
public:
- static const char* ascii_fingerprint; // = "1B96A8C05BA9DD699FC8CD842240ABDE";
- static const uint8_t binary_fingerprint[16]; // = {0x1B,0x96,0xA8,0xC0,0x5B,0xA9,0xDD,0x69,0x9F,0xC8,0xCD,0x84,0x22,0x40,0xAB,0xDE};
+ static const char* ascii_fingerprint; // = "B4CB1E4F8F8F4D50183DD372AD11753A";
+ static const uint8_t binary_fingerprint[16]; // = {0xB4,0xCB,0x1E,0x4F,0x8F,0x8F,0x4D,0x50,0x18,0x3D,0xD3,0x72,0xAD,0x11,0x75,0x3A};
- TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0) {
+ TFetchResultsReq() : orientation((TFetchOrientation::type)0), maxRows(0), fetchType(0) {
orientation = (TFetchOrientation::type)0;
}
@@ -3619,6 +3623,9 @@ class TFetchResultsReq {
TOperationHandle operationHandle;
TFetchOrientation::type orientation;
int64_t maxRows;
+ int16_t fetchType;
+
+ _TFetchResultsReq__isset __isset;
void __set_operationHandle(const TOperationHandle& val) {
operationHandle = val;
@@ -3632,6 +3639,11 @@ class TFetchResultsReq {
maxRows = val;
}
+ void __set_fetchType(const int16_t val) {
+ fetchType = val;
+ __isset.fetchType = true;
+ }
+
bool operator == (const TFetchResultsReq & rhs) const
{
if (!(operationHandle == rhs.operationHandle))
@@ -3640,6 +3652,10 @@ class TFetchResultsReq {
return false;
if (!(maxRows == rhs.maxRows))
return false;
+ if (__isset.fetchType != rhs.__isset.fetchType)
+ return false;
+ else if (__isset.fetchType && !(fetchType == rhs.fetchType))
+ return false;
return true;
}
bool operator != (const TFetchResultsReq &rhs) const {
Modified: hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java (original)
+++ hive/branches/cbo/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TFetchResultsReq.java Thu Aug 28 03:15:13 2014
@@ -37,6 +37,7 @@ public class TFetchResultsReq implements
private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1);
private static final org.apache.thrift.protocol.TField ORIENTATION_FIELD_DESC = new org.apache.thrift.protocol.TField("orientation", org.apache.thrift.protocol.TType.I32, (short)2);
private static final org.apache.thrift.protocol.TField MAX_ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("maxRows", org.apache.thrift.protocol.TType.I64, (short)3);
+ private static final org.apache.thrift.protocol.TField FETCH_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("fetchType", org.apache.thrift.protocol.TType.I16, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -47,6 +48,7 @@ public class TFetchResultsReq implements
private TOperationHandle operationHandle; // required
private TFetchOrientation orientation; // required
private long maxRows; // required
+ private short fetchType; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -56,7 +58,8 @@ public class TFetchResultsReq implements
* @see TFetchOrientation
*/
ORIENTATION((short)2, "orientation"),
- MAX_ROWS((short)3, "maxRows");
+ MAX_ROWS((short)3, "maxRows"),
+ FETCH_TYPE((short)4, "fetchType");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -77,6 +80,8 @@ public class TFetchResultsReq implements
return ORIENTATION;
case 3: // MAX_ROWS
return MAX_ROWS;
+ case 4: // FETCH_TYPE
+ return FETCH_TYPE;
default:
return null;
}
@@ -118,7 +123,9 @@ public class TFetchResultsReq implements
// isset id assignments
private static final int __MAXROWS_ISSET_ID = 0;
+ private static final int __FETCHTYPE_ISSET_ID = 1;
private byte __isset_bitfield = 0;
+ private _Fields optionals[] = {_Fields.FETCH_TYPE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -128,6 +135,8 @@ public class TFetchResultsReq implements
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TFetchOrientation.class)));
tmpMap.put(_Fields.MAX_ROWS, new org.apache.thrift.meta_data.FieldMetaData("maxRows", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.FETCH_TYPE, new org.apache.thrift.meta_data.FieldMetaData("fetchType", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I16)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TFetchResultsReq.class, metaDataMap);
}
@@ -135,6 +144,8 @@ public class TFetchResultsReq implements
public TFetchResultsReq() {
this.orientation = org.apache.hive.service.cli.thrift.TFetchOrientation.FETCH_NEXT;
+ this.fetchType = (short)0;
+
}
public TFetchResultsReq(
@@ -161,6 +172,7 @@ public class TFetchResultsReq implements
this.orientation = other.orientation;
}
this.maxRows = other.maxRows;
+ this.fetchType = other.fetchType;
}
public TFetchResultsReq deepCopy() {
@@ -174,6 +186,8 @@ public class TFetchResultsReq implements
setMaxRowsIsSet(false);
this.maxRows = 0;
+ this.fetchType = (short)0;
+
}
public TOperationHandle getOperationHandle() {
@@ -252,6 +266,28 @@ public class TFetchResultsReq implements
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MAXROWS_ISSET_ID, value);
}
+ public short getFetchType() {
+ return this.fetchType;
+ }
+
+ public void setFetchType(short fetchType) {
+ this.fetchType = fetchType;
+ setFetchTypeIsSet(true);
+ }
+
+ public void unsetFetchType() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FETCHTYPE_ISSET_ID);
+ }
+
+ /** Returns true if field fetchType is set (has been assigned a value) and false otherwise */
+ public boolean isSetFetchType() {
+ return EncodingUtils.testBit(__isset_bitfield, __FETCHTYPE_ISSET_ID);
+ }
+
+ public void setFetchTypeIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FETCHTYPE_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case OPERATION_HANDLE:
@@ -278,6 +314,14 @@ public class TFetchResultsReq implements
}
break;
+ case FETCH_TYPE:
+ if (value == null) {
+ unsetFetchType();
+ } else {
+ setFetchType((Short)value);
+ }
+ break;
+
}
}
@@ -292,6 +336,9 @@ public class TFetchResultsReq implements
case MAX_ROWS:
return Long.valueOf(getMaxRows());
+ case FETCH_TYPE:
+ return Short.valueOf(getFetchType());
+
}
throw new IllegalStateException();
}
@@ -309,6 +356,8 @@ public class TFetchResultsReq implements
return isSetOrientation();
case MAX_ROWS:
return isSetMaxRows();
+ case FETCH_TYPE:
+ return isSetFetchType();
}
throw new IllegalStateException();
}
@@ -353,6 +402,15 @@ public class TFetchResultsReq implements
return false;
}
+ boolean this_present_fetchType = true && this.isSetFetchType();
+ boolean that_present_fetchType = true && that.isSetFetchType();
+ if (this_present_fetchType || that_present_fetchType) {
+ if (!(this_present_fetchType && that_present_fetchType))
+ return false;
+ if (this.fetchType != that.fetchType)
+ return false;
+ }
+
return true;
}
@@ -375,6 +433,11 @@ public class TFetchResultsReq implements
if (present_maxRows)
builder.append(maxRows);
+ boolean present_fetchType = true && (isSetFetchType());
+ builder.append(present_fetchType);
+ if (present_fetchType)
+ builder.append(fetchType);
+
return builder.toHashCode();
}
@@ -416,6 +479,16 @@ public class TFetchResultsReq implements
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetFetchType()).compareTo(typedOther.isSetFetchType());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFetchType()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fetchType, typedOther.fetchType);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -455,6 +528,12 @@ public class TFetchResultsReq implements
sb.append("maxRows:");
sb.append(this.maxRows);
first = false;
+ if (isSetFetchType()) {
+ if (!first) sb.append(", ");
+ sb.append("fetchType:");
+ sb.append(this.fetchType);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -540,6 +619,14 @@ public class TFetchResultsReq implements
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // FETCH_TYPE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I16) {
+ struct.fetchType = iprot.readI16();
+ struct.setFetchTypeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -566,6 +653,11 @@ public class TFetchResultsReq implements
oprot.writeFieldBegin(MAX_ROWS_FIELD_DESC);
oprot.writeI64(struct.maxRows);
oprot.writeFieldEnd();
+ if (struct.isSetFetchType()) {
+ oprot.writeFieldBegin(FETCH_TYPE_FIELD_DESC);
+ oprot.writeI16(struct.fetchType);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -586,6 +678,14 @@ public class TFetchResultsReq implements
struct.operationHandle.write(oprot);
oprot.writeI32(struct.orientation.getValue());
oprot.writeI64(struct.maxRows);
+ BitSet optionals = new BitSet();
+ if (struct.isSetFetchType()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetFetchType()) {
+ oprot.writeI16(struct.fetchType);
+ }
}
@Override
@@ -598,6 +698,11 @@ public class TFetchResultsReq implements
struct.setOrientationIsSet(true);
struct.maxRows = iprot.readI64();
struct.setMaxRowsIsSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.fetchType = iprot.readI16();
+ struct.setFetchTypeIsSet(true);
+ }
}
}
Modified: hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py (original)
+++ hive/branches/cbo/service/src/gen/thrift/gen-py/TCLIService/ttypes.py Thu Aug 28 03:15:13 2014
@@ -5752,6 +5752,7 @@ class TFetchResultsReq:
- operationHandle
- orientation
- maxRows
+ - fetchType
"""
thrift_spec = (
@@ -5759,12 +5760,14 @@ class TFetchResultsReq:
(1, TType.STRUCT, 'operationHandle', (TOperationHandle, TOperationHandle.thrift_spec), None, ), # 1
(2, TType.I32, 'orientation', None, 0, ), # 2
(3, TType.I64, 'maxRows', None, None, ), # 3
+ (4, TType.I16, 'fetchType', None, 0, ), # 4
)
- def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None,):
+ def __init__(self, operationHandle=None, orientation=thrift_spec[2][4], maxRows=None, fetchType=thrift_spec[4][4],):
self.operationHandle = operationHandle
self.orientation = orientation
self.maxRows = maxRows
+ self.fetchType = fetchType
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -5791,6 +5794,11 @@ class TFetchResultsReq:
self.maxRows = iprot.readI64();
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I16:
+ self.fetchType = iprot.readI16();
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -5813,6 +5821,10 @@ class TFetchResultsReq:
oprot.writeFieldBegin('maxRows', TType.I64, 3)
oprot.writeI64(self.maxRows)
oprot.writeFieldEnd()
+ if self.fetchType is not None:
+ oprot.writeFieldBegin('fetchType', TType.I16, 4)
+ oprot.writeI16(self.fetchType)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
Modified: hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb (original)
+++ hive/branches/cbo/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb Thu Aug 28 03:15:13 2014
@@ -1598,11 +1598,13 @@ class TFetchResultsReq
OPERATIONHANDLE = 1
ORIENTATION = 2
MAXROWS = 3
+ FETCHTYPE = 4
FIELDS = {
OPERATIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'operationHandle', :class => ::TOperationHandle},
ORIENTATION => {:type => ::Thrift::Types::I32, :name => 'orientation', :default => 0, :enum_class => ::TFetchOrientation},
- MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'}
+ MAXROWS => {:type => ::Thrift::Types::I64, :name => 'maxRows'},
+ FETCHTYPE => {:type => ::Thrift::Types::I16, :name => 'fetchType', :default => 0, :optional => true}
}
def struct_fields; FIELDS; end
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIService.java Thu Aug 28 03:15:13 2014
@@ -46,7 +46,6 @@ import org.apache.hive.service.Composite
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.operation.Operation;
-import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
@@ -67,7 +66,6 @@ public class CLIService extends Composit
private HiveConf hiveConf;
private SessionManager sessionManager;
- private IMetaStoreClient metastoreClient;
private UserGroupInformation serviceUGI;
private UserGroupInformation httpUGI;
@@ -80,11 +78,8 @@ public class CLIService extends Composit
this.hiveConf = hiveConf;
sessionManager = new SessionManager();
addService(sessionManager);
- /**
- * If auth mode is Kerberos, do a kerberos login for the service from the keytab
- */
- if (hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase(
- HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
+ // If the hadoop cluster is secure, do a kerberos login for the service from the keytab
+ if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf);
this.serviceUGI = ShimLoader.getHadoopShims().getUGIForConf(hiveConf);
@@ -132,21 +127,23 @@ public class CLIService extends Composit
} catch (IOException eIO) {
throw new ServiceException("Error setting stage directories", eIO);
}
-
+ // Initialize and test a connection to the metastore
+ IMetaStoreClient metastoreClient = null;
try {
- // Initialize and test a connection to the metastore
metastoreClient = new HiveMetaStoreClient(hiveConf);
metastoreClient.getDatabases("default");
} catch (Exception e) {
throw new ServiceException("Unable to connect to MetaStore!", e);
}
+ finally {
+ if (metastoreClient != null) {
+ metastoreClient.close();
+ }
+ }
}
@Override
public synchronized void stop() {
- if (metastoreClient != null) {
- metastoreClient.close();
- }
super.stop();
}
@@ -170,7 +167,7 @@ public class CLIService extends Composit
throws HiveSQLException {
SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, null, configuration,
true, delegationToken);
- LOG.debug(sessionHandle + ": openSession()");
+ LOG.debug(sessionHandle + ": openSessionWithImpersonation()");
return sessionHandle;
}
@@ -423,25 +420,20 @@ public class CLIService extends Composit
}
/* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long)
+ * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
*/
@Override
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
+ public RowSet fetchResults(OperationHandle opHandle)
throws HiveSQLException {
- RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle)
- .getParentSession().fetchResults(opHandle, orientation, maxRows);
- LOG.debug(opHandle + ": fetchResults()");
- return rowSet;
+ return fetchResults(opHandle, Operation.DEFAULT_FETCH_ORIENTATION,
+ Operation.DEFAULT_FETCH_MAX_ROWS, FetchType.QUERY_OUTPUT);
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
- */
@Override
- public RowSet fetchResults(OperationHandle opHandle)
- throws HiveSQLException {
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException {
RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle)
- .getParentSession().fetchResults(opHandle);
+ .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType);
LOG.debug(opHandle + ": fetchResults()");
return rowSet;
}
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java Thu Aug 28 03:15:13 2014
@@ -28,19 +28,17 @@ import org.apache.hive.service.auth.Hive
*
*/
public abstract class CLIServiceClient implements ICLIService {
+ private static final long DEFAULT_MAX_ROWS = 1000;
public SessionHandle openSession(String username, String password)
throws HiveSQLException {
return openSession(username, password, Collections.<String, String>emptyMap());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
- */
@Override
public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
// TODO: provide STATIC default value
- return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000);
+ return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT);
}
@Override
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java Thu Aug 28 03:15:13 2014
@@ -181,13 +181,10 @@ public class EmbeddedCLIServiceClient ex
return cliService.getResultSetMetadata(opHandle);
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.CLIServiceClient#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long)
- */
@Override
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
- throws HiveSQLException {
- return cliService.fetchResults(opHandle, orientation, maxRows);
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException {
+ return cliService.fetchResults(opHandle, orientation, maxRows, fetchType);
}
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/ICLIService.java Thu Aug 28 03:15:13 2014
@@ -27,79 +27,78 @@ import org.apache.hive.service.auth.Hive
public interface ICLIService {
- public abstract SessionHandle openSession(String username, String password,
+ SessionHandle openSession(String username, String password,
Map<String, String> configuration)
throws HiveSQLException;
- public abstract SessionHandle openSessionWithImpersonation(String username, String password,
+ SessionHandle openSessionWithImpersonation(String username, String password,
Map<String, String> configuration, String delegationToken)
throws HiveSQLException;
- public abstract void closeSession(SessionHandle sessionHandle)
+ void closeSession(SessionHandle sessionHandle)
throws HiveSQLException;
- public abstract GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType)
+ GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType)
throws HiveSQLException;
- public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
+ OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay)
throws HiveSQLException;
- public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle,
+ OperationHandle executeStatementAsync(SessionHandle sessionHandle,
String statement, Map<String, String> confOverlay)
throws HiveSQLException;
- public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle)
+ OperationHandle getTypeInfo(SessionHandle sessionHandle)
throws HiveSQLException;
- public abstract OperationHandle getCatalogs(SessionHandle sessionHandle)
+ OperationHandle getCatalogs(SessionHandle sessionHandle)
throws HiveSQLException;
- public abstract OperationHandle getSchemas(SessionHandle sessionHandle,
+ OperationHandle getSchemas(SessionHandle sessionHandle,
String catalogName, String schemaName)
throws HiveSQLException;
- public abstract OperationHandle getTables(SessionHandle sessionHandle,
+ OperationHandle getTables(SessionHandle sessionHandle,
String catalogName, String schemaName, String tableName, List<String> tableTypes)
throws HiveSQLException;
- public abstract OperationHandle getTableTypes(SessionHandle sessionHandle)
+ OperationHandle getTableTypes(SessionHandle sessionHandle)
throws HiveSQLException;
- public abstract OperationHandle getColumns(SessionHandle sessionHandle,
+ OperationHandle getColumns(SessionHandle sessionHandle,
String catalogName, String schemaName, String tableName, String columnName)
throws HiveSQLException;
- public abstract OperationHandle getFunctions(SessionHandle sessionHandle,
+ OperationHandle getFunctions(SessionHandle sessionHandle,
String catalogName, String schemaName, String functionName)
throws HiveSQLException;
- public abstract OperationStatus getOperationStatus(OperationHandle opHandle)
+ OperationStatus getOperationStatus(OperationHandle opHandle)
throws HiveSQLException;
- public abstract void cancelOperation(OperationHandle opHandle)
+ void cancelOperation(OperationHandle opHandle)
throws HiveSQLException;
- public abstract void closeOperation(OperationHandle opHandle)
+ void closeOperation(OperationHandle opHandle)
throws HiveSQLException;
- public abstract TableSchema getResultSetMetadata(OperationHandle opHandle)
+ TableSchema getResultSetMetadata(OperationHandle opHandle)
throws HiveSQLException;
- public abstract RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
- long maxRows)
- throws HiveSQLException;
-
- public abstract RowSet fetchResults(OperationHandle opHandle)
+ RowSet fetchResults(OperationHandle opHandle)
throws HiveSQLException;
- public abstract String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException;
+
+ String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
String owner, String renewer) throws HiveSQLException;
- public abstract void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
String tokenStr) throws HiveSQLException;
- public abstract void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
String tokenStr) throws HiveSQLException;
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java Thu Aug 28 03:15:13 2014
@@ -42,11 +42,8 @@ public class GetCatalogsOperation extend
rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
setState(OperationState.FINISHED);
}
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java Thu Aug 28 03:15:13 2014
@@ -114,11 +114,8 @@ public class GetColumnsOperation extends
this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient();
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java Thu Aug 28 03:15:13 2014
@@ -68,11 +68,8 @@ public class GetFunctionsOperation exten
this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
if ((null == catalogName || "".equals(catalogName))
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java Thu Aug 28 03:15:13 2014
@@ -50,11 +50,8 @@ public class GetSchemasOperation extends
this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient();
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java Thu Aug 28 03:15:13 2014
@@ -50,11 +50,8 @@ public class GetTableTypesOperation exte
rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
for (TableType type : TableType.values()) {
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java Thu Aug 28 03:15:13 2014
@@ -71,11 +71,8 @@ public class GetTablesOperation extends
this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient();
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java Thu Aug 28 03:15:13 2014
@@ -79,11 +79,8 @@ public class GetTypeInfoOperation extend
rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion());
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
for (Type type : Type.values()) {
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java Thu Aug 28 03:15:13 2014
@@ -94,11 +94,8 @@ public class HiveCommandOperation extend
IOUtils.cleanup(LOG, parentSession.getSessionState().err);
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.operation.Operation#run()
- */
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.RUNNING);
try {
String command = getStatement().trim();
@@ -136,6 +133,7 @@ public class HiveCommandOperation extend
setState(OperationState.CLOSED);
tearDownSessionIO();
cleanTmpFile();
+ cleanupOperationLog();
}
/* (non-Javadoc)
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java Thu Aug 28 03:15:13 2014
@@ -46,6 +46,7 @@ public abstract class MetadataOperation
@Override
public void close() throws HiveSQLException {
setState(OperationState.CLOSED);
+ cleanupOperationLog();
}
/**
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/Operation.java Thu Aug 28 03:15:13 2014
@@ -17,6 +17,8 @@
*/
package org.apache.hive.service.cli.operation;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.util.EnumSet;
import java.util.concurrent.Future;
@@ -41,11 +43,14 @@ public abstract class Operation {
private final OperationHandle opHandle;
private HiveConf configuration;
public static final Log LOG = LogFactory.getLog(Operation.class.getName());
+ public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
protected volatile HiveSQLException operationException;
protected final boolean runAsync;
protected volatile Future<?> backgroundHandle;
+ protected OperationLog operationLog;
+ protected boolean isOperationLogEnabled;
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
@@ -106,6 +111,11 @@ public abstract class Operation {
opHandle.setHasResultSet(hasResultSet);
}
+
+ public OperationLog getOperationLog() {
+ return operationLog;
+ }
+
protected final OperationState setState(OperationState newState) throws HiveSQLException {
state.validateTransition(newState);
this.state = newState;
@@ -138,7 +148,97 @@ public abstract class Operation {
return OperationState.ERROR.equals(state);
}
- public abstract void run() throws HiveSQLException;
+ protected void createOperationLog() {
+ if (parentSession.isOperationLogEnabled()) {
+ File operationLogFile = new File(parentSession.getOperationLogSessionDir(),
+ opHandle.getHandleIdentifier().toString());
+ isOperationLogEnabled = true;
+
+ // create log file
+ try {
+ if (operationLogFile.exists()) {
+ LOG.warn("The operation log file should not exist, but it is already there: " +
+ operationLogFile.getAbsolutePath());
+ operationLogFile.delete();
+ }
+ if (!operationLogFile.createNewFile()) {
+ // the log file already exists and cannot be deleted.
+ // If it can be read/written, keep its contents and use it.
+ if (!operationLogFile.canRead() || !operationLogFile.canWrite()) {
+ LOG.warn("The already existed operation log file cannot be recreated, " +
+ "and it cannot be read or written: " + operationLogFile.getAbsolutePath());
+ isOperationLogEnabled = false;
+ return;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e);
+ isOperationLogEnabled = false;
+ return;
+ }
+
+ // create OperationLog object with above log file
+ try {
+ operationLog = new OperationLog(opHandle.toString(), operationLogFile);
+ } catch (FileNotFoundException e) {
+ LOG.warn("Unable to instantiate OperationLog object for operation: " +
+ opHandle, e);
+ isOperationLogEnabled = false;
+ return;
+ }
+
+ // register this operationLog to current thread
+ OperationLog.setCurrentOperationLog(operationLog);
+ }
+ }
+
+ protected void unregisterOperationLog() {
+ if (isOperationLogEnabled) {
+ OperationLog.removeCurrentOperationLog();
+ }
+ }
+
+ /**
+ * Invoked before runInternal().
+ * Set up some preconditions, or configurations.
+ */
+ protected void beforeRun() {
+ createOperationLog();
+ }
+
+ /**
+ * Invoked after runInternal(), even if an exception is thrown in runInternal().
+ * Clean up resources, which was set up in beforeRun().
+ */
+ protected void afterRun() {
+ unregisterOperationLog();
+ }
+
+ /**
+ * Implemented by subclass of Operation class to execute specific behaviors.
+ * @throws HiveSQLException
+ */
+ protected abstract void runInternal() throws HiveSQLException;
+
+ public void run() throws HiveSQLException {
+ beforeRun();
+ try {
+ runInternal();
+ } finally {
+ afterRun();
+ }
+ }
+
+ protected void cleanupOperationLog() {
+ if (isOperationLogEnabled) {
+ if (operationLog == null) {
+ LOG.error("Operation [ " + opHandle.getHandleIdentifier() + " ] "
+ + "logging is enabled, but its OperationLog object cannot be found.");
+ } else {
+ operationLog.close();
+ }
+ }
+ }
// TODO: make this abstract and implement in subclasses.
public void cancel() throws HiveSQLException {
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Thu Aug 28 03:15:13 2014
@@ -18,6 +18,7 @@
package org.apache.hive.service.cli.operation;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -25,22 +26,19 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hive.service.AbstractService;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.log4j.*;
/**
* OperationManager.
*
*/
public class OperationManager extends AbstractService {
-
+ private static final String DEFAULT_LAYOUT_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n";
private final Log LOG = LogFactory.getLog(OperationManager.class.getName());
private HiveConf hiveConf;
@@ -54,7 +52,11 @@ public class OperationManager extends Ab
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
-
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+ initOperationLogCapture();
+ } else {
+ LOG.debug("Operation level logging is turned off");
+ }
super.init(hiveConf);
}
@@ -70,6 +72,30 @@ public class OperationManager extends Ab
super.stop();
}
+ private void initOperationLogCapture() {
+ // There should be a ConsoleAppender. Copy its Layout.
+ Logger root = Logger.getRootLogger();
+ Layout layout = null;
+
+ Enumeration<?> appenders = root.getAllAppenders();
+ while (appenders.hasMoreElements()) {
+ Appender ap = (Appender) appenders.nextElement();
+ if (ap.getClass().equals(ConsoleAppender.class)) {
+ layout = ap.getLayout();
+ break;
+ }
+ }
+
+ if (layout == null) {
+ layout = new PatternLayout(DEFAULT_LAYOUT_PATTERN);
+ LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern.");
+ }
+
+ // Register another Appender (with the same layout) that talks to us.
+ Appender ap = new LogDivertAppender(layout, this);
+ root.addAppender(ap);
+ }
+
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
String statement, Map<String, String> confOverlay, boolean runAsync)
throws HiveSQLException {
@@ -191,4 +217,39 @@ public class OperationManager extends Ab
throws HiveSQLException {
return getOperation(opHandle).getNextRowSet(orientation, maxRows);
}
+
+ public RowSet getOperationLogRowSet(OperationHandle opHandle,
+ FetchOrientation orientation, long maxRows)
+ throws HiveSQLException {
+ // get the OperationLog object from the operation
+ OperationLog operationLog = getOperation(opHandle).getOperationLog();
+ if (operationLog == null) {
+ throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle);
+ }
+
+ // read logs
+ List<String> logs = operationLog.readOperationLog(orientation, maxRows);
+
+ // convert logs to RowSet
+ TableSchema tableSchema = new TableSchema(getLogSchema());
+ RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion());
+ for (String log : logs) {
+ rowSet.addRow(new String[] {log});
+ }
+
+ return rowSet;
+ }
+
+ private Schema getLogSchema() {
+ Schema schema = new Schema();
+ FieldSchema fieldSchema = new FieldSchema();
+ fieldSchema.setName("operation_log");
+ fieldSchema.setType("string");
+ schema.addToFieldSchemas(fieldSchema);
+ return schema;
+ }
+
+ public OperationLog getOperationLogByThread() {
+ return OperationLog.getCurrentOperationLog();
+ }
}
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Thu Aug 28 03:15:13 2014
@@ -60,6 +60,7 @@ import org.apache.hive.service.cli.RowSe
import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
/**
* SQLOperation.
@@ -134,7 +135,7 @@ public class SQLOperation extends Execut
}
}
- private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException {
+ private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException {
try {
// In Hive server mode, we are not able to retry in the FetchTask
// case, when calling fetch queries since execute() has returned.
@@ -164,50 +165,63 @@ public class SQLOperation extends Execut
}
@Override
- public void run() throws HiveSQLException {
+ public void runInternal() throws HiveSQLException {
setState(OperationState.PENDING);
final HiveConf opConfig = getConfigForOperation();
prepare(opConfig);
if (!shouldRunAsync()) {
- runInternal(opConfig);
+ runQuery(opConfig);
} else {
+ // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
final SessionState parentSessionState = SessionState.get();
- // current Hive object needs to be set in aysnc thread in case of remote metastore.
- // The metastore client in Hive is associated with right user
- final Hive sessionHive = getCurrentHive();
- // current UGI will get used by metastore when metsatore is in embedded mode
- // so this needs to get passed to the new async thread
+ // ThreadLocal Hive object needs to be set in background thread.
+ // The metastore client in Hive is associated with right user.
+ final Hive parentHive = getSessionHive();
+ // Current UGI will get used by metastore when metsatore is in embedded mode
+ // So this needs to get passed to the new background thread
final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
-
// Runnable impl to call runInternal asynchronously,
// from a different thread
Runnable backgroundOperation = new Runnable() {
-
@Override
public void run() {
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
-
- // Storing the current Hive object necessary when doAs is enabled
- // User information is part of the metastore client member in Hive
- Hive.set(sessionHive);
+ Hive.set(parentHive);
SessionState.setCurrentSessionState(parentSessionState);
+ // Set current OperationLog in this async thread for keeping on saving query log.
+ registerCurrentOperationLog();
try {
- runInternal(opConfig);
+ runQuery(opConfig);
} catch (HiveSQLException e) {
setOperationException(e);
LOG.error("Error running hive query: ", e);
+ } finally {
+ unregisterOperationLog();
}
return null;
}
};
+
try {
ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction);
} catch (Exception e) {
setOperationException(new HiveSQLException(e));
LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
}
+ finally {
+ /**
+ * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+ * when this thread is garbage collected later.
+ * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+ */
+ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+ ThreadWithGarbageCleanup currentThread =
+ (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+ currentThread.cacheThreadLocalRawStore();
+ }
+ }
}
};
try {
@@ -223,6 +237,12 @@ public class SQLOperation extends Execut
}
}
+ /**
+ * Returns the current UGI on the stack
+ * @param opConfig
+ * @return UserGroupInformation
+ * @throws HiveSQLException
+ */
private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException {
try {
return ShimLoader.getHadoopShims().getUGIForConf(opConfig);
@@ -231,11 +251,28 @@ public class SQLOperation extends Execut
}
}
- private Hive getCurrentHive() throws HiveSQLException {
+ /**
+ * Returns the ThreadLocal Hive for the current thread
+ * @return Hive
+ * @throws HiveSQLException
+ */
+ private Hive getSessionHive() throws HiveSQLException {
try {
return Hive.get();
} catch (HiveException e) {
- throw new HiveSQLException("Failed to get current Hive object", e);
+ throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
+ }
+ }
+
+ private void registerCurrentOperationLog() {
+ if (isOperationLogEnabled) {
+ if (operationLog == null) {
+ LOG.warn("Failed to get current OperationLog object of Operation: " +
+ getHandle().getHandleIdentifier());
+ isOperationLogEnabled = false;
+ return;
+ }
+ OperationLog.setCurrentOperationLog(operationLog);
}
}
@@ -267,6 +304,7 @@ public class SQLOperation extends Execut
@Override
public void close() throws HiveSQLException {
cleanup(OperationState.CLOSED);
+ cleanupOperationLog();
}
@Override
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Thu Aug 28 03:15:13 2014
@@ -23,13 +23,7 @@ import java.util.Map;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
public interface HiveSession extends HiveSessionBase {
@@ -144,10 +138,8 @@ public interface HiveSession extends Hiv
public TableSchema getResultSetMetadata(OperationHandle opHandle)
throws HiveSQLException;
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
- throws HiveSQLException;
-
- public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException;
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException;
public String getDelegationToken(HiveAuthFactory authFactory, String owner,
String renewer) throws HiveSQLException;
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Thu Aug 28 03:15:13 2014
@@ -24,6 +24,7 @@ import org.apache.hive.service.cli.Sessi
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import java.io.File;
import java.util.Map;
/**
@@ -38,40 +39,57 @@ public interface HiveSessionBase {
* Set the session manager for the session
* @param sessionManager
*/
- public void setSessionManager(SessionManager sessionManager);
+ void setSessionManager(SessionManager sessionManager);
/**
* Get the session manager for the session
*/
- public SessionManager getSessionManager();
+ SessionManager getSessionManager();
/**
* Set operation manager for the session
* @param operationManager
*/
- public void setOperationManager(OperationManager operationManager);
+ void setOperationManager(OperationManager operationManager);
/**
* Initialize the session
* @param sessionConfMap
*/
- public void initialize(Map<String, String> sessionConfMap) throws Exception;
+ void initialize(Map<String, String> sessionConfMap) throws Exception;
- public SessionHandle getSessionHandle();
+ /**
+ * Check whether operation logging is enabled and session dir is created successfully
+ */
+ boolean isOperationLogEnabled();
+
+ /**
+ * Get the session dir, which is the parent dir of operation logs
+ * @return a file representing the parent directory of operation logs
+ */
+ File getOperationLogSessionDir();
+
+ /**
+ * Set the session dir, which is the parent dir of operation logs
+ * @param operationLogRootDir the parent dir of the session dir
+ */
+ void setOperationLogSessionDir(File operationLogRootDir);
+
+ SessionHandle getSessionHandle();
- public String getUsername();
+ String getUsername();
- public String getPassword();
+ String getPassword();
- public HiveConf getHiveConf();
+ HiveConf getHiveConf();
- public SessionState getSessionState();
+ SessionState getSessionState();
- public String getUserName();
+ String getUserName();
- public void setUserName(String userName);
+ void setUserName(String userName);
- public String getIpAddress();
+ String getIpAddress();
- public void setIpAddress(String ipAddress);
+ void setIpAddress(String ipAddress);
}
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Thu Aug 28 03:15:13 2014
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
@@ -44,14 +45,7 @@ import org.apache.hadoop.hive.ql.process
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
import org.apache.hive.service.cli.operation.GetCatalogsOperation;
import org.apache.hive.service.cli.operation.GetColumnsOperation;
@@ -62,6 +56,7 @@ import org.apache.hive.service.cli.opera
import org.apache.hive.service.cli.operation.MetadataOperation;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
/**
* HiveSession
@@ -86,6 +81,8 @@ public class HiveSessionImpl implements
private OperationManager operationManager;
private IMetaStoreClient metastoreClient = null;
private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
+ private boolean isOperationLogEnabled;
+ private File sessionLogDir;
public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
HiveConf serverhiveConf, String ipAddress) {
@@ -95,14 +92,19 @@ public class HiveSessionImpl implements
this.hiveConf = new HiveConf(serverhiveConf);
this.ipAddress = ipAddress;
- // set an explicit session name to control the download directory name
+ // Set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
sessionHandle.getHandleIdentifier().toString());
- // use thrift transportable formatter
+ // Use thrift transportable formatter
hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
FetchFormatter.ThriftFormatter.class.getName());
hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
+ /**
+ * Create a new SessionState object that will be associated with this HiveServer2 session.
+ * When the server executes multiple queries in the same session,
+ * this SessionState object is reused across multiple queries.
+ */
sessionState = new SessionState(hiveConf, username);
sessionState.setUserIpAddress(ipAddress);
sessionState.setIsHiveServerQuery(true);
@@ -111,11 +113,9 @@ public class HiveSessionImpl implements
@Override
public void initialize(Map<String, String> sessionConfMap) throws Exception {
- //process global init file: .hiverc
+ // Process global init file: .hiverc
processGlobalInitFile();
- SessionState.setCurrentSessionState(sessionState);
-
- //set conf properties specified by user from client side
+ // Set conf properties specified by user from client side
if (sessionConfMap != null) {
configureSession(sessionConfMap);
}
@@ -169,6 +169,7 @@ public class HiveSessionImpl implements
}
private void configureSession(Map<String, String> sessionConfMap) throws Exception {
+ SessionState.setCurrentSessionState(sessionState);
for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
String key = entry.getKey();
if (key.startsWith("set:")) {
@@ -182,6 +183,34 @@ public class HiveSessionImpl implements
}
@Override
+ public void setOperationLogSessionDir(File operationLogRootDir) {
+ sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
+ isOperationLogEnabled = true;
+
+ if (!sessionLogDir.exists()) {
+ if (!sessionLogDir.mkdir()) {
+ LOG.warn("Unable to create operation log session directory: " +
+ sessionLogDir.getAbsolutePath());
+ isOperationLogEnabled = false;
+ }
+ }
+
+ if (isOperationLogEnabled) {
+ LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
+ }
+ }
+
+ @Override
+ public boolean isOperationLogEnabled() {
+ return isOperationLogEnabled;
+ }
+
+ @Override
+ public File getOperationLogSessionDir() {
+ return sessionLogDir;
+ }
+
+ @Override
public TProtocolVersion getProtocolVersion() {
return sessionHandle.getProtocolVersion();
}
@@ -211,14 +240,26 @@ public class HiveSessionImpl implements
}
protected synchronized void acquire() throws HiveSQLException {
- // need to make sure that the this connections session state is
- // stored in the thread local for sessions.
+ // Need to make sure that the this HiveServer2's session's session state is
+ // stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
}
+ /**
+ * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
+ * other requests.
+ * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+ * when this thread is garbage collected later.
+ * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+ */
protected synchronized void release() {
assert sessionState != null;
SessionState.detachSession();
+ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+ ThreadWithGarbageCleanup currentThread =
+ (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+ currentThread.cacheThreadLocalRawStore();
+ }
}
@Override
@@ -468,7 +509,7 @@ public class HiveSessionImpl implements
try {
acquire();
/**
- * For metadata operations like getTables(), getColumns() etc,
+ * For metadata operations like getTables(), getColumns() etc,
* the session allocates a private metastore handler which should be
* closed at the end of the session
*/
@@ -480,6 +521,9 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
}
opHandleSet.clear();
+ // Cleanup session log directory.
+ cleanupSessionLogDir();
+
HiveHistory hiveHist = sessionState.getHiveHistory();
if (null != hiveHist) {
hiveHist.closeStream();
@@ -492,6 +536,16 @@ public class HiveSessionImpl implements
}
}
+ private void cleanupSessionLogDir() {
+ if (isOperationLogEnabled) {
+ try {
+ FileUtils.forceDelete(sessionLogDir);
+ } catch (Exception e) {
+ LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
+ }
+ }
+ }
+
@Override
public SessionState getSessionState() {
return sessionState;
@@ -539,22 +593,17 @@ public class HiveSessionImpl implements
}
@Override
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
- throws HiveSQLException {
- acquire();
- try {
- return sessionManager.getOperationManager()
- .getOperationNextRowSet(opHandle, orientation, maxRows);
- } finally {
- release();
- }
- }
-
- @Override
- public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException {
acquire();
try {
- return sessionManager.getOperationManager().getOperationNextRowSet(opHandle);
+ if (fetchType == FetchType.QUERY_OUTPUT) {
+ return sessionManager.getOperationManager()
+ .getOperationNextRowSet(opHandle, orientation, maxRows);
+ } else {
+ return sessionManager.getOperationManager()
+ .getOperationLogRowSet(opHandle, orientation, maxRows);
+ }
} finally {
release();
}
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Thu Aug 28 03:15:13 2014
@@ -18,6 +18,8 @@
package org.apache.hive.service.cli.session;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +28,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -38,6 +41,7 @@ import org.apache.hive.service.cli.HiveS
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
/**
* SessionManager.
@@ -52,6 +56,8 @@ public class SessionManager extends Comp
new ConcurrentHashMap<SessionHandle, HiveSession>();
private final OperationManager operationManager = new OperationManager();
private ThreadPoolExecutor backgroundOperationPool;
+ private boolean isOperationLogEnabled;
+ private File operationLogRootDir;
public SessionManager() {
super("SessionManager");
@@ -64,22 +70,31 @@ public class SessionManager extends Comp
} catch (HiveException e) {
throw new RuntimeException("Error applying authorization policy on hive configuration", e);
}
-
this.hiveConf = hiveConf;
+ //Create operation log root directory, if operation logging is enabled
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+ initOperationLogRootDir();
+ }
+ createBackgroundOperationPool();
+ addService(operationManager);
+ super.init(hiveConf);
+ }
+
+ private void createBackgroundOperationPool() {
int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
- LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
+ LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
- LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
+ LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
- LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
+ LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
// Create a thread pool with #backgroundPoolSize threads
// Threads terminate when they are idle for more than the keepAliveTime
- // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ String threadPoolName = "HiveServer2-Background-Pool";
backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
- keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
+ keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
backgroundOperationPool.allowCoreThreadTimeOut(true);
- addService(operationManager);
- super.init(hiveConf);
}
private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
@@ -91,6 +106,36 @@ public class SessionManager extends Comp
ss.applyAuthorizationPolicy();
}
+ private void initOperationLogRootDir() {
+ operationLogRootDir = new File(
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION));
+ isOperationLogEnabled = true;
+
+ if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) {
+ LOG.warn("The operation log root directory exists, but it is not a directory: " +
+ operationLogRootDir.getAbsolutePath());
+ isOperationLogEnabled = false;
+ }
+
+ if (!operationLogRootDir.exists()) {
+ if (!operationLogRootDir.mkdirs()) {
+ LOG.warn("Unable to create operation log root directory: " +
+ operationLogRootDir.getAbsolutePath());
+ isOperationLogEnabled = false;
+ }
+ }
+
+ if (isOperationLogEnabled) {
+ LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath());
+ try {
+ FileUtils.forceDeleteOnExit(operationLogRootDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " +
+ operationLogRootDir.getAbsolutePath(), e);
+ }
+ }
+ }
+
@Override
public synchronized void start() {
super.start();
@@ -109,6 +154,18 @@ public class SessionManager extends Comp
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
}
+ cleanupLoggingRootDir();
+ }
+
+ private void cleanupLoggingRootDir() {
+ if (isOperationLogEnabled) {
+ try {
+ FileUtils.forceDelete(operationLogRootDir);
+ } catch (Exception e) {
+ LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir
+ .getAbsolutePath(), e);
+ }
+ }
}
public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,
@@ -132,6 +189,9 @@ public class SessionManager extends Comp
session.setOperationManager(operationManager);
try {
session.initialize(sessionConf);
+ if (isOperationLogEnabled) {
+ session.setOperationLogSessionDir(operationLogRootDir);
+ }
session.open();
} catch (Exception e) {
throw new HiveSQLException("Failed to open new session", e);
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Thu Aug 28 03:15:13 2014
@@ -19,12 +19,17 @@
package org.apache.hive.service.cli.thrift;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
@@ -65,6 +70,11 @@ public class ThriftBinaryCLIService exte
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+ workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+ String threadPoolName = "HiveServer2-Handler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
TServerSocket serverSocket = null;
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
@@ -84,8 +94,7 @@ public class ThriftBinaryCLIService exte
.processorFactory(processorFactory)
.transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
- .minWorkerThreads(minWorkerThreads)
- .maxWorkerThreads(maxWorkerThreads);
+ .executorService(executorService);
server = new TThreadPoolServer(sargs);
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Thu Aug 28 03:15:13 2014
@@ -29,20 +29,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.TSetIpAddressProcessor;
-import org.apache.hive.service.cli.CLIService;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
@@ -71,6 +61,7 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
+ protected int workerKeepAliveTime;
protected static HiveAuthFactory hiveAuthFactory;
@@ -242,7 +233,9 @@ public abstract class ThriftCLIService e
if (userName == null) {
userName = req.getUsername();
}
- return getProxyUser(userName, req.getConfiguration(), getIpAddress());
+ String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+ LOG.debug("Client's username: " + effectiveClientUser);
+ return effectiveClientUser;
}
/**
@@ -532,7 +525,8 @@ public abstract class ThriftCLIService e
RowSet rowSet = cliService.fetchResults(
new OperationHandle(req.getOperationHandle()),
FetchOrientation.getFetchOrientation(req.getOrientation()),
- req.getMaxRows());
+ req.getMaxRows(),
+ FetchType.getFetchType(req.getFetchType()));
resp.setResults(rowSet.toTRowSet());
resp.setHasMoreRows(false);
resp.setStatus(OK_STATUS);
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Thu Aug 28 03:15:13 2014
@@ -22,18 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.CLIServiceClient;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.thrift.TException;
/**
@@ -377,17 +366,15 @@ public class ThriftCLIServiceClient exte
}
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long)
- */
@Override
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
- throws HiveSQLException {
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows,
+ FetchType fetchType) throws HiveSQLException {
try {
TFetchResultsReq req = new TFetchResultsReq();
req.setOperationHandle(opHandle.toTOperationHandle());
req.setOrientation(orientation.toTFetchOrientation());
req.setMaxRows(maxRows);
+ req.setFetchType(fetchType.toTFetchType());
TFetchResultsResp resp = cliService.FetchResults(req);
checkStatus(resp.getStatus());
return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion());
@@ -404,7 +391,7 @@ public class ThriftCLIServiceClient exte
@Override
public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
// TODO: set the correct default fetch size
- return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000);
+ return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT);
}
@Override
Modified: hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1621031&r1=1621030&r2=1621031&view=diff
==============================================================================
--- hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/cbo/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Thu Aug 28 03:15:13 2014
@@ -18,6 +18,11 @@
package org.apache.hive.service.cli.thrift;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -26,6 +31,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -36,7 +42,7 @@ import org.eclipse.jetty.server.ssl.SslS
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
public class ThriftHttpCLIService extends ThriftCLIService {
@@ -63,13 +69,17 @@ public class ThriftHttpCLIService extend
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
+ workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer = new org.eclipse.jetty.server.Server();
- QueuedThreadPool threadPool = new QueuedThreadPool();
- threadPool.setMinThreads(minWorkerThreads);
- threadPool.setMaxThreads(maxWorkerThreads);
+ String threadPoolName = "HiveServer2-HttpHandler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+ ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
httpServer.setThreadPool(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();;