You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/22 02:46:24 UTC
[15/35] hive git commit: HIVE-15232: Add notification events for
functions and indexes (Mohit Sabharwal, reviewed by Chaoyu Tang)
HIVE-15232: Add notification events for functions and indexes (Mohit Sabharwal, reviewed by Chaoyu Tang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62d802b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62d802b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62d802b8
Branch: refs/heads/hive-14535
Commit: 62d802b871f9654041bb8551b9622b9f5c75e856
Parents: cebd251
Author: Mohit Sabharwal <mo...@cloudera.com>
Authored: Fri Nov 18 10:29:37 2016 -0500
Committer: Mohit Sabharwal <mo...@cloudera.com>
Committed: Fri Nov 18 10:29:37 2016 -0500
----------------------------------------------------------------------
.../hive/hcatalog/common/HCatConstants.java | 5 +
.../listener/DbNotificationListener.java | 77 ++++-
.../hcatalog/messaging/AlterIndexMessage.java | 30 ++
.../messaging/CreateFunctionMessage.java | 30 ++
.../hcatalog/messaging/CreateIndexMessage.java | 30 ++
.../hcatalog/messaging/DropFunctionMessage.java | 30 ++
.../hcatalog/messaging/DropIndexMessage.java | 30 ++
.../hcatalog/messaging/HCatEventMessage.java | 10 +-
.../hcatalog/messaging/MessageDeserializer.java | 39 ++-
.../hive/hcatalog/messaging/MessageFactory.java | 38 +++
.../messaging/json/JSONAlterIndexMessage.java | 89 ++++++
.../json/JSONCreateFunctionMessage.java | 81 ++++++
.../messaging/json/JSONCreateIndexMessage.java | 82 ++++++
.../messaging/json/JSONDropFunctionMessage.java | 81 ++++++
.../messaging/json/JSONDropIndexMessage.java | 82 ++++++
.../messaging/json/JSONMessageDeserializer.java | 56 +++-
.../messaging/json/JSONMessageFactory.java | 62 +++-
.../listener/DummyRawStoreFailEvent.java | 30 +-
.../listener/TestDbNotificationListener.java | 286 ++++++++++++++++++-
.../hadoop/hive/metastore/HiveMetaStore.java | 51 ++--
.../hive/metastore/MetaStoreEventListener.java | 16 ++
.../metastore/events/CreateFunctionEvent.java | 39 +++
.../metastore/events/DropFunctionEvent.java | 39 +++
.../hadoop/hive/metastore/DummyListener.java | 12 +
24 files changed, 1282 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
index 72930eb..3998407 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
@@ -162,6 +162,11 @@ public final class HCatConstants {
public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE";
public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE";
public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE";
+ public static final String HCAT_CREATE_FUNCTION_EVENT = "CREATE_FUNCTION";
+ public static final String HCAT_DROP_FUNCTION_EVENT = "DROP_FUNCTION";
+ public static final String HCAT_CREATE_INDEX_EVENT = "CREATE_INDEX";
+ public static final String HCAT_DROP_INDEX_EVENT = "DROP_INDEX";
+ public static final String HCAT_ALTER_INDEX_EVENT = "ALTER_INDEX";
public static final String HCAT_INSERT_EVENT = "INSERT";
public static final String HCAT_MESSAGE_VERSION = "HCAT_MESSAGE_VERSION";
public static final String HCAT_MESSAGE_FORMAT = "HCAT_MESSAGE_FORMAT";
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 0b3d891..ea7520d 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -17,8 +17,13 @@
*/
package org.apache.hive.hcatalog.listener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
@@ -42,6 +47,8 @@ import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@@ -221,6 +228,72 @@ public class DbNotificationListener extends MetaStoreEventListener {
enqueue(event);
}
+ /**
+ * @param fnEvent function event
+ * @throws MetaException
+ */
+ public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+ Function fn = fnEvent.getFunction();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_CREATE_FUNCTION_EVENT,
+ msgFactory.buildCreateFunctionMessage(fn).toString());
+ event.setDbName(fn.getDbName());
+ enqueue(event);
+ }
+
+ /**
+ * @param fnEvent function event
+ * @throws MetaException
+ */
+ public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+ Function fn = fnEvent.getFunction();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_DROP_FUNCTION_EVENT,
+ msgFactory.buildDropFunctionMessage(fn).toString());
+ event.setDbName(fn.getDbName());
+ enqueue(event);
+ }
+
+ /**
+ * @param indexEvent index event
+ * @throws MetaException
+ */
+ public void onAddIndex (AddIndexEvent indexEvent) throws MetaException {
+ Index index = indexEvent.getIndex();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_CREATE_INDEX_EVENT,
+ msgFactory.buildCreateIndexMessage(index).toString());
+ event.setDbName(index.getDbName());
+ enqueue(event);
+ }
+
+ /**
+ * @param indexEvent index event
+ * @throws MetaException
+ */
+ public void onDropIndex (DropIndexEvent indexEvent) throws MetaException {
+ Index index = indexEvent.getIndex();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_DROP_INDEX_EVENT,
+ msgFactory.buildDropIndexMessage(index).toString());
+ event.setDbName(index.getDbName());
+ enqueue(event);
+ }
+
+ /**
+ * @param indexEvent index event
+ * @throws MetaException
+ */
+ public void onAlterIndex (AlterIndexEvent indexEvent) throws MetaException {
+ Index before = indexEvent.getOldIndex();
+ Index after = indexEvent.getNewIndex();
+ NotificationEvent event = new NotificationEvent(0, now(),
+ HCatConstants.HCAT_ALTER_INDEX_EVENT,
+ msgFactory.buildAlterIndexMessage(before, after).toString());
+ event.setDbName(before.getDbName());
+ enqueue(event);
+ }
+
@Override
public void onInsert(InsertEvent insertEvent) throws MetaException {
NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT,
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java
new file mode 100644
index 0000000..4841dce
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Index is altered in HCatalog.
+ */
+public abstract class AlterIndexMessage extends HCatEventMessage {
+
+ protected AlterIndexMessage() {
+ super(EventType.ALTER_INDEX);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java
new file mode 100644
index 0000000..753c165
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Function is created in HCatalog.
+ */
+public abstract class CreateFunctionMessage extends HCatEventMessage {
+
+ protected CreateFunctionMessage() {
+ super(EventType.CREATE_FUNCTION);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java
new file mode 100644
index 0000000..192f6de
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Index is created in HCatalog.
+ */
+public abstract class CreateIndexMessage extends HCatEventMessage {
+
+ protected CreateIndexMessage() {
+ super(EventType.CREATE_INDEX);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java
new file mode 100644
index 0000000..19d4d5b
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Function is dropped in HCatalog.
+ */
+public abstract class DropFunctionMessage extends HCatEventMessage {
+
+ protected DropFunctionMessage() {
+ super(EventType.DROP_FUNCTION);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java
new file mode 100644
index 0000000..46b7394
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Index is dropped in HCatalog.
+ */
+public abstract class DropIndexMessage extends HCatEventMessage {
+
+ protected DropIndexMessage() {
+ super(EventType.DROP_INDEX);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java
index 538fa68..dca95c7 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java
@@ -40,7 +40,12 @@ public abstract class HCatEventMessage {
DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT),
ALTER_TABLE(HCatConstants.HCAT_ALTER_TABLE_EVENT),
ALTER_PARTITION(HCatConstants.HCAT_ALTER_PARTITION_EVENT),
- INSERT(HCatConstants.HCAT_INSERT_EVENT);
+ INSERT(HCatConstants.HCAT_INSERT_EVENT),
+ CREATE_FUNCTION(HCatConstants.HCAT_CREATE_FUNCTION_EVENT),
+ DROP_FUNCTION(HCatConstants.HCAT_DROP_FUNCTION_EVENT),
+ CREATE_INDEX(HCatConstants.HCAT_CREATE_INDEX_EVENT),
+ DROP_INDEX(HCatConstants.HCAT_DROP_INDEX_EVENT),
+ ALTER_INDEX(HCatConstants.HCAT_ALTER_INDEX_EVENT);
private String typeString;
@@ -85,7 +90,7 @@ public abstract class HCatEventMessage {
* Getter for the timestamp associated with the operation.
* @return Timestamp (Long - seconds since epoch).
*/
- public abstract Long getTimestamp();
+ public abstract Long getTimestamp();
/**
* Class invariant. Checked after construction or deserialization.
@@ -97,7 +102,6 @@ public abstract class HCatEventMessage {
throw new IllegalStateException("Event-type unset.");
if (getDB() == null)
throw new IllegalArgumentException("DB-name unset.");
-
return this;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java
index 8ea3998..e18780f 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java
@@ -19,6 +19,9 @@
package org.apache.hive.hcatalog.messaging;
+import org.apache.hive.hcatalog.messaging.json.JSONCreateFunctionMessage;
+import org.apache.hive.hcatalog.messaging.json.JSONDropFunctionMessage;
+
/**
* Interface for converting HCat events from String-form back to HCatEventMessage instances.
*/
@@ -46,9 +49,18 @@ public abstract class MessageDeserializer {
return getAlterPartitionMessage(messageBody);
case DROP_PARTITION:
return getDropPartitionMessage(messageBody);
+ case CREATE_FUNCTION:
+ return getCreateFunctionMessage(messageBody);
+ case DROP_FUNCTION:
+ return getDropFunctionMessage(messageBody);
+ case CREATE_INDEX:
+ return getCreateIndexMessage(messageBody);
+ case DROP_INDEX:
+ return getDropIndexMessage(messageBody);
+ case ALTER_INDEX:
+ return getAlterIndexMessage(messageBody);
case INSERT:
return getInsertMessage(messageBody);
-
default:
throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
}
@@ -99,6 +111,31 @@ public abstract class MessageDeserializer {
public abstract DropPartitionMessage getDropPartitionMessage(String messageBody);
/**
+ * Method to de-serialize CreateFunctionMessage instance.
+ */
+ public abstract CreateFunctionMessage getCreateFunctionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropFunctionMessage instance.
+ */
+ public abstract DropFunctionMessage getDropFunctionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize CreateIndexMessage instance.
+ */
+ public abstract CreateIndexMessage getCreateIndexMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropIndexMessage instance.
+ */
+ public abstract DropIndexMessage getDropIndexMessage(String messageBody);
+
+ /**
+ * Method to de-serialize AlterIndexMessage instance.
+ */
+ public abstract AlterIndexMessage getAlterIndexMessage(String messageBody);
+
+ /**
* Method to deserialize InsertMessage
* @param messageBody the message in serialized form
* @return message in object form
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
index 0710dd0..44574fe 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
@@ -22,6 +22,8 @@ package org.apache.hive.hcatalog.messaging;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.util.ReflectionUtils;
@@ -163,6 +165,42 @@ public abstract class MessageFactory {
public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions);
/**
+ * Factory method for CreateFunctionMessage.
+ * @param fn The Function being added.
+ * @return CreateFunctionMessage instance.
+ */
+ public abstract CreateFunctionMessage buildCreateFunctionMessage(Function fn);
+
+ /**
+ * Factory method for DropFunctionMessage.
+ * @param fn The Function being dropped.
+ * @return DropFunctionMessage instance.
+ */
+ public abstract DropFunctionMessage buildDropFunctionMessage(Function fn);
+
+ /**
+ * Factory method for CreateIndexMessage.
+ * @param idx The Index being added.
+ * @return CreateIndexMessage instance.
+ */
+ public abstract CreateIndexMessage buildCreateIndexMessage(Index idx);
+
+ /**
+ * Factory method for DropIndexMessage.
+ * @param idx The Index being dropped.
+ * @return DropIndexMessage instance.
+ */
+ public abstract DropIndexMessage buildDropIndexMessage(Index idx);
+
+ /**
+ * Factory method for AlterIndexMessage.
+ * @param before The index before the alter
+ * @param after The index after the alter
+ * @return AlterIndexMessage
+ */
+ public abstract AlterIndexMessage buildAlterIndexMessage(Index before, Index after);
+
+ /**
* Factory method for building insert message
* @param db Name of the database the insert occurred in
* @param table Name of the table the insert occurred in
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java
new file mode 100644
index 0000000..25b0987
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hive.hcatalog.messaging.AlterIndexMessage;
+import org.apache.thrift.TException;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of AlterIndexMessage.
+ */
+public class JSONAlterIndexMessage extends AlterIndexMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, beforeIndexObjJson, afterIndexObjJson;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONAlterIndexMessage() {}
+
+ public JSONAlterIndexMessage(String server, String servicePrincipal, Index before, Index after,
+ Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = after.getDbName();
+ this.timestamp = timestamp;
+ try {
+ this.beforeIndexObjJson = JSONMessageFactory.createIndexObjJson(before);
+ this.afterIndexObjJson = JSONMessageFactory.createIndexObjJson(after);
+ } catch (TException ex) {
+ throw new IllegalArgumentException("Could not serialize Index object", ex);
+ }
+
+ checkValid();
+ }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ public String getBeforeIndexObjJson() {
+ return beforeIndexObjJson;
+ }
+
+ public String getAfterIndexObjJson() {
+ return afterIndexObjJson;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
new file mode 100644
index 0000000..fb883fc
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hive.hcatalog.messaging.CreateFunctionMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of CreateFunctionMessage.
+ */
+public class JSONCreateFunctionMessage extends CreateFunctionMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, functionObjJson;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONCreateFunctionMessage() {}
+
+ public JSONCreateFunctionMessage(String server, String servicePrincipal, Function fn, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = fn.getDbName();
+ this.timestamp = timestamp;
+ try {
+ this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+ } catch (TException ex) {
+ throw new IllegalArgumentException("Could not serialize Function object", ex);
+ }
+ checkValid();
+ }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ public String getFunctionObjJson() {
+ return functionObjJson;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java
new file mode 100644
index 0000000..8d83149
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hive.hcatalog.messaging.CreateIndexMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of CreateIndexMessage.
+ */
+public class JSONCreateIndexMessage extends CreateIndexMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, indexObjJson;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONCreateIndexMessage() {}
+
+ public JSONCreateIndexMessage(String server, String servicePrincipal, Index index, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = index.getDbName();
+ try {
+ this.indexObjJson = JSONMessageFactory.createIndexObjJson(index);
+ } catch (TException ex) {
+ throw new IllegalArgumentException("Could not serialize Index object", ex);
+ }
+
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ public String getIndexObjJson() {
+ return indexObjJson;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
new file mode 100644
index 0000000..334e36f
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hive.hcatalog.messaging.DropFunctionMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of CreateDatabaseMessage.
+ */
+public class JSONDropFunctionMessage extends DropFunctionMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, functionObjJson;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONDropFunctionMessage() {}
+
+ public JSONDropFunctionMessage(String server, String servicePrincipal, Function fn, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = fn.getDbName();
+ this.timestamp = timestamp;
+ try {
+ this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn);
+ } catch (TException ex) {
+ throw new IllegalArgumentException("Could not serialize Function object", ex);
+ }
+ checkValid();
+ }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ public String getFunctionObjJson() {
+ return functionObjJson;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java
new file mode 100644
index 0000000..bacaa1d
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hive.hcatalog.messaging.DropIndexMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of DropIndexMessage.
+ */
+public class JSONDropIndexMessage extends DropIndexMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, indexObjJson;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONDropIndexMessage() {}
+
+ public JSONDropIndexMessage(String server, String servicePrincipal, Index index, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = index.getDbName();
+ try {
+ this.indexObjJson = JSONMessageFactory.createIndexObjJson(index);
+ } catch (TException ex) {
+ throw new IllegalArgumentException("Could not serialize Index object", ex);
+ }
+
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ public String getIndexObjJson() {
+ return indexObjJson;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java
index 834fdde..bd45d09 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java
@@ -19,13 +19,17 @@
package org.apache.hive.hcatalog.messaging.json;
-import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hive.hcatalog.messaging.AlterIndexMessage;
import org.apache.hive.hcatalog.messaging.AlterPartitionMessage;
import org.apache.hive.hcatalog.messaging.AlterTableMessage;
import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hive.hcatalog.messaging.CreateFunctionMessage;
+import org.apache.hive.hcatalog.messaging.CreateIndexMessage;
import org.apache.hive.hcatalog.messaging.CreateTableMessage;
import org.apache.hive.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hive.hcatalog.messaging.DropFunctionMessage;
+import org.apache.hive.hcatalog.messaging.DropIndexMessage;
import org.apache.hive.hcatalog.messaging.DropPartitionMessage;
import org.apache.hive.hcatalog.messaging.DropTableMessage;
import org.apache.hive.hcatalog.messaging.InsertMessage;
@@ -125,6 +129,56 @@ public class JSONMessageDeserializer extends MessageDeserializer {
}
@Override
+ public CreateFunctionMessage getCreateFunctionMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONCreateFunctionMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONCreateFunctionMessage.", exception);
+ }
+ }
+
+ @Override
+ public DropFunctionMessage getDropFunctionMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONDropFunctionMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception);
+ }
+ }
+
+ @Override
+ public CreateIndexMessage getCreateIndexMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONCreateIndexMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONCreateIndexMessage.", exception);
+ }
+ }
+
+ @Override
+ public DropIndexMessage getDropIndexMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONDropIndexMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONDropIndexMessage.", exception);
+ }
+ }
+
+ @Override
+ public AlterIndexMessage getAlterIndexMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONAlterIndexMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONAlterIndexMessage.", exception);
+ }
+ }
+
+ @Override
public InsertMessage getInsertMessage(String messageBody) {
try {
return mapper.readValue(messageBody, JSONInsertMessage.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
index 6b74b54..251084f 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
@@ -19,12 +19,17 @@
package org.apache.hive.hcatalog.messaging.json;
-import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hive.hcatalog.messaging.AlterIndexMessage;
+import org.apache.hive.hcatalog.messaging.CreateFunctionMessage;
+import org.apache.hive.hcatalog.messaging.CreateIndexMessage;
+import org.apache.hive.hcatalog.messaging.DropFunctionMessage;
+import org.apache.hive.hcatalog.messaging.DropIndexMessage;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
@@ -38,9 +43,13 @@ import org.apache.hive.hcatalog.messaging.DropTableMessage;
import org.apache.hive.hcatalog.messaging.InsertMessage;
import org.apache.hive.hcatalog.messaging.MessageDeserializer;
import org.apache.hive.hcatalog.messaging.MessageFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -54,7 +63,6 @@ public class JSONMessageFactory extends MessageFactory {
private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName());
-
private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
@Override
@@ -121,6 +129,36 @@ public class JSONMessageFactory extends MessageFactory {
}
@Override
+ public CreateFunctionMessage buildCreateFunctionMessage(Function fn) {
+ return new JSONCreateFunctionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, fn,
+ now());
+ }
+
+ @Override
+ public DropFunctionMessage buildDropFunctionMessage(Function fn) {
+ return new JSONDropFunctionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, fn,
+ now());
+ }
+
+ @Override
+ public CreateIndexMessage buildCreateIndexMessage(Index idx) {
+ return new JSONCreateIndexMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, idx,
+ now());
+ }
+
+ @Override
+ public DropIndexMessage buildDropIndexMessage(Index idx) {
+ return new JSONDropIndexMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, idx,
+ now());
+ }
+
+ @Override
+ public AlterIndexMessage buildAlterIndexMessage(Index before, Index after) {
+ return new JSONAlterIndexMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before, after,
+ now());
+ }
+
+ @Override
public InsertMessage buildInsertMessage(String db, String table, Map<String,String> partKeyVals,
List<String> files) {
return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partKeyVals,
@@ -140,11 +178,21 @@ public class JSONMessageFactory extends MessageFactory {
}
private static List<Map<String, String>> getPartitionKeyValues(final Table table, Iterator<Partition> iterator) {
- return Lists.newArrayList(Iterators.transform(iterator, new Function<Partition, Map<String, String>>() {
+ return Lists.newArrayList(Iterators.transform(iterator, new com.google.common.base.Function<Partition, Map<String, String>>() {
@Override
public Map<String, String> apply(@Nullable Partition partition) {
return getPartitionKeyValues(table, partition);
}
}));
}
-}
+
+ static String createFunctionObjJson(Function functionObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(functionObj, "UTF-8");
+ }
+
+ static String createIndexObjJson(Index indexObj) throws TException {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(indexObj, "UTF-8");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 4a7801b..5282a5a 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -300,7 +300,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
@Override
public boolean addIndex(Index index) throws InvalidObjectException, MetaException {
- return objectStore.addIndex(index);
+ if (shouldEventSucceed) {
+ return objectStore.addIndex(index);
+ } else {
+ throw new RuntimeException("Event failed.");
+ }
}
@Override
@@ -312,7 +316,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
@Override
public boolean dropIndex(String dbName, String origTableName, String indexName)
throws MetaException {
- return objectStore.dropIndex(dbName, origTableName, indexName);
+ if (shouldEventSucceed) {
+ return objectStore.dropIndex(dbName, origTableName, indexName);
+ } else {
+ throw new RuntimeException("Event failed.");
+ }
}
@Override
@@ -330,7 +338,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
@Override
public void alterIndex(String dbName, String baseTblName, String name, Index newIndex)
throws InvalidObjectException, MetaException {
- objectStore.alterIndex(dbName, baseTblName, name, newIndex);
+ if (shouldEventSucceed) {
+ objectStore.alterIndex(dbName, baseTblName, name, newIndex);
+ } else {
+ throw new RuntimeException("Event failed.");
+ }
}
@Override
@@ -751,7 +763,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
@Override
public void createFunction(Function func) throws InvalidObjectException,
MetaException {
- objectStore.createFunction(func);
+ if (shouldEventSucceed) {
+ objectStore.createFunction(func);
+ } else {
+ throw new RuntimeException("Event failed.");
+ }
}
@Override
@@ -764,7 +780,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
public void dropFunction(String dbName, String funcName)
throws MetaException, NoSuchObjectException, InvalidObjectException,
InvalidInputException {
- objectStore.dropFunction(dbName, funcName);
+ if (shouldEventSucceed) {
+ objectStore.dropFunction(dbName, funcName);
+ } else {
+ throw new RuntimeException("Event failed.");
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 1cd32d5..4f97cf4 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -23,8 +23,20 @@ import static junit.framework.Assert.assertNull;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.htrace.fasterxml.jackson.core.JsonFactory;
+import org.apache.htrace.fasterxml.jackson.core.JsonParser;
+import org.apache.htrace.fasterxml.jackson.databind.JsonNode;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -46,6 +58,8 @@ import org.apache.hive.hcatalog.common.HCatConstants;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -55,7 +69,6 @@ import java.util.List;
import java.util.Map;
public class TestDbNotificationListener {
-
private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName());
private static final int EVENTS_TTL = 30;
private static final int CLEANUP_SLEEP_TIME = 10;
@@ -402,7 +415,7 @@ public class TestDbNotificationListener {
partition = new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable",
startTime, startTime, sd, emptyParameters);
- msClient.add_partition(partition);
+ msClient.add_partition(partition);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
msClient.dropPartition("default", "dropparttable", Arrays.asList("tomorrow"), false);
@@ -415,6 +428,244 @@ public class TestDbNotificationListener {
}
@Test
+ public void createFunction() throws Exception {
+ String funcName = "createFunction";
+ String dbName = "default";
+ String ownerName = "me";
+ String funcClass = "o.a.h.h.myfunc";
+ String funcResource = "file:/tmp/somewhere";
+ Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER,
+ startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+ funcResource)));
+ msClient.createFunction(func);
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ Function funcObj = getFunctionObj(getJsonTree(event));
+ assertEquals(dbName, funcObj.getDbName());
+ assertEquals(funcName, funcObj.getFunctionName());
+ assertEquals(funcClass, funcObj.getClassName());
+ assertEquals(ownerName, funcObj.getOwnerName());
+ assertEquals(FunctionType.JAVA, funcObj.getFunctionType());
+ assertEquals(1, funcObj.getResourceUrisSize());
+ assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
+ assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
+
+ DummyRawStoreFailEvent.setEventSucceed(false);
+ func = new Function("createFunction2", dbName, "o.a.h.h.myfunc2", "me", PrincipalType.USER,
+ startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+ "file:/tmp/somewhere2")));
+ try {
+ msClient.createFunction(func);
+ } catch (Exception ex) {
+ // expected
+ }
+
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+ }
+
+ @Test
+ public void dropFunction() throws Exception {
+ String funcName = "dropfunctiontest";
+ String dbName = "default";
+ String ownerName = "me";
+ String funcClass = "o.a.h.h.dropFunctionTest";
+ String funcResource = "file:/tmp/somewhere";
+ Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER,
+ startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+ funcResource)));
+ msClient.createFunction(func);
+ msClient.dropFunction(dbName, funcName);
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, rsp.getEventsSize());
+ NotificationEvent event = rsp.getEvents().get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ Function funcObj = getFunctionObj(getJsonTree(event));
+ assertEquals(dbName, funcObj.getDbName());
+ assertEquals(funcName, funcObj.getFunctionName());
+ assertEquals(funcClass, funcObj.getClassName());
+ assertEquals(ownerName, funcObj.getOwnerName());
+ assertEquals(FunctionType.JAVA, funcObj.getFunctionType());
+ assertEquals(1, funcObj.getResourceUrisSize());
+ assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
+ assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
+
+ func = new Function("dropfunctiontest2", dbName, "o.a.h.h.dropFunctionTest2", "me",
+ PrincipalType.USER, startTime, FunctionType.JAVA, Arrays.asList(
+ new ResourceUri(ResourceType.JAR, "file:/tmp/somewhere2")));
+ msClient.createFunction(func);
+ DummyRawStoreFailEvent.setEventSucceed(false);
+ try {
+ msClient.dropFunction(dbName, "dropfunctiontest2");
+ } catch (Exception ex) {
+ // expected
+ }
+
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(3, rsp.getEventsSize());
+ }
+
+ @Test
+ public void createIndex() throws Exception {
+ String indexName = "createIndex";
+ String dbName = "default";
+ String tableName = "createIndexTable";
+ String indexTableName = tableName + "__" + indexName + "__";
+ int startTime = (int)(System.currentTimeMillis() / 1000);
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", ""));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("key", "value");
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
+ serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+ Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+ Index index = new Index(indexName, null, "default", tableName, startTime, startTime,
+ indexTableName, sd, emptyParameters, false);
+ Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createIndex(index, indexTable);
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(3, rsp.getEventsSize());
+ NotificationEvent event = rsp.getEvents().get(2);
+ assertEquals(firstEventId + 3, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ Index indexObj = getIndexObj(getJsonTree(event));
+ assertEquals(dbName, indexObj.getDbName());
+ assertEquals(indexName, indexObj.getIndexName());
+ assertEquals(tableName, indexObj.getOrigTableName());
+ assertEquals(indexTableName, indexObj.getIndexTableName());
+
+ DummyRawStoreFailEvent.setEventSucceed(false);
+ index = new Index("createIndexTable2", null, "default", tableName, startTime, startTime,
+ "createIndexTable2__createIndexTable2__", sd, emptyParameters, false);
+ Table indexTable2 = new Table("createIndexTable2__createIndexTable2__", dbName, "me",
+ startTime, startTime, 0, sd, null, emptyParameters, null, null, null);
+ try {
+ msClient.createIndex(index, indexTable2);
+ } catch (Exception ex) {
+ // expected
+ }
+
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(3, rsp.getEventsSize());
+ }
+
+ @Test
+ public void dropIndex() throws Exception {
+ String indexName = "dropIndex";
+ String dbName = "default";
+ String tableName = "dropIndexTable";
+ String indexTableName = tableName + "__" + indexName + "__";
+ int startTime = (int)(System.currentTimeMillis() / 1000);
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", ""));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("key", "value");
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
+ serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+ Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+ Index index = new Index(indexName, null, "default", tableName, startTime, startTime,
+ indexTableName, sd, emptyParameters, false);
+ Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createIndex(index, indexTable);
+ msClient.dropIndex(dbName, tableName, indexName, true); // drops index and indexTable
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(4, rsp.getEventsSize());
+ NotificationEvent event = rsp.getEvents().get(3);
+ assertEquals(firstEventId + 4, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ Index indexObj = getIndexObj(getJsonTree(event));
+ assertEquals(dbName, indexObj.getDbName());
+ assertEquals(indexName.toLowerCase(), indexObj.getIndexName());
+ assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName());
+ assertEquals(indexTableName.toLowerCase(), indexObj.getIndexTableName());
+
+ index = new Index("dropIndexTable2", null, "default", tableName, startTime, startTime,
+ "dropIndexTable__dropIndexTable2__", sd, emptyParameters, false);
+ Table indexTable2 = new Table("dropIndexTable__dropIndexTable2__", dbName, "me", startTime,
+ startTime, 0, sd, null, emptyParameters, null, null, null);
+ msClient.createIndex(index, indexTable2);
+ DummyRawStoreFailEvent.setEventSucceed(false);
+ try {
+ msClient.dropIndex(dbName, tableName, "dropIndex2", true); // drops index and indexTable
+ } catch (Exception ex) {
+ // expected
+ }
+
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(6, rsp.getEventsSize());
+ }
+
+ @Test
+ public void alterIndex() throws Exception {
+ String indexName = "alterIndex";
+ String dbName = "default";
+ String tableName = "alterIndexTable";
+ String indexTableName = tableName + "__" + indexName + "__";
+ int startTime = (int)(System.currentTimeMillis() / 1000);
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema("col1", "int", ""));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("key", "value");
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
+ serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+ Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createTable(table);
+ Index oldIndex = new Index(indexName, null, "default", tableName, startTime, startTime,
+ indexTableName, sd, emptyParameters, false);
+ Table oldIndexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ msClient.createIndex(oldIndex, oldIndexTable); // creates index and index table
+ Index newIndex = new Index(indexName, null, "default", tableName, startTime, startTime + 1,
+ indexTableName, sd, emptyParameters, false);
+ msClient.alter_index(dbName, tableName, indexName, newIndex);
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(4, rsp.getEventsSize());
+ NotificationEvent event = rsp.getEvents().get(3);
+ assertEquals(firstEventId + 4, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ Index indexObj = getIndexObj(getJsonTree(event), "afterIndexObjJson");
+ assertEquals(dbName, indexObj.getDbName());
+ assertEquals(indexName, indexObj.getIndexName());
+ assertEquals(tableName, indexObj.getOrigTableName());
+ assertEquals(indexTableName, indexObj.getIndexTableName());
+ assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime());
+
+ DummyRawStoreFailEvent.setEventSucceed(false);
+ try {
+ msClient.alter_index(dbName, tableName, indexName, newIndex);
+ } catch (Exception ex) {
+ // expected
+ }
+
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(4, rsp.getEventsSize());
+ }
+
+ @Test
public void insertTable() throws Exception {
List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(new FieldSchema("col1", "int", "nocomment"));
@@ -700,7 +951,6 @@ public class TestDbNotificationListener {
assertEquals(firstEventId + 24, event.getEventId());
assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
-
}
@Test
@@ -721,4 +971,30 @@ public class TestDbNotificationListener {
LOG.info("second trigger done");
assertEquals(0, rsp2.getEventsSize());
}
+
+ private ObjectNode getJsonTree(NotificationEvent event) throws Exception {
+ JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(jsonParser, ObjectNode.class);
+ }
+
+ private Function getFunctionObj(JsonNode jsonTree) throws Exception {
+ TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
+ Function funcObj = new Function();
+ String tableJson = jsonTree.get("functionObjJson").asText();
+ deSerializer.deserialize(funcObj, tableJson, "UTF-8");
+ return funcObj;
+ }
+
+ private Index getIndexObj(JsonNode jsonTree) throws Exception {
+ return getIndexObj(jsonTree, "indexObjJson");
+ }
+
+ private Index getIndexObj(JsonNode jsonTree, String indexObjKey) throws Exception {
+ TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
+ Index indexObj = new Index();
+ String tableJson = jsonTree.get(indexObjKey).asText();
+ deSerializer.deserialize(indexObj, tableJson, "UTF-8");
+ return indexObj;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index c0ef25e..48bebb2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -176,8 +176,10 @@ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
@@ -3909,7 +3911,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
+ " idx=" + index_name + " newidx=" + newIndex.getIndexName());
newIndex.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
.currentTimeMillis() / 1000));
-
boolean success = false;
Exception ex = null;
Index oldIndex = null;
@@ -3917,9 +3918,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
try {
ms.openTransaction();
oldIndex = get_index_by_name(dbname, base_table_name, index_name);
-
firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
-
ms.alterIndex(dbname, base_table_name, index_name, newIndex);
if (transactionalListeners.size() > 0) {
AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this);
@@ -4585,16 +4584,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
private Index add_index_core(final RawStore ms, final Index index, final Table indexTable)
throws InvalidObjectException, AlreadyExistsException, MetaException {
-
boolean success = false, indexTableCreated = false;
-
String[] qualified =
MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName());
-
try {
ms.openTransaction();
firePreEvent(new PreAddIndexEvent(index, this));
-
Index old_index = null;
try {
old_index = get_index_by_name(index.getDbName(), index
@@ -4628,7 +4623,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
index.setCreateTime((int) time);
index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
-
if (ms.addIndex(index)) {
if (transactionalListeners.size() > 0) {
AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
@@ -4687,21 +4681,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
final String dbName, final String tblName,
final String indexName, final boolean deleteData) throws NoSuchObjectException,
MetaException, TException, IOException, InvalidObjectException, InvalidInputException {
-
boolean success = false;
Index index = null;
Path tblPath = null;
List<Path> partPaths = null;
try {
ms.openTransaction();
-
// drop the underlying index table
index = get_index_by_name(dbName, tblName, indexName); // throws exception if not exists
-
firePreEvent(new PreDropIndexEvent(index, this));
-
ms.dropIndex(dbName, tblName, indexName);
-
String idxTblName = index.getIndexTableName();
if (idxTblName != null) {
String[] qualified = MetaStoreUtils.getQualifiedName(index.getDbName(), idxTblName);
@@ -4722,7 +4711,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Drop the partitions and get a list of partition locations which need to be deleted
partPaths = dropPartitionsAndGetLocations(ms, qualified[0], qualified[1], tblPath,
tbl.getPartitionKeys(), deleteData);
-
if (!ms.dropTable(qualified[0], qualified[1])) {
throw new MetaException("Unable to drop underlying data table "
+ qualified[0] + "." + qualified[1] + " for index " + indexName);
@@ -6181,31 +6169,43 @@ public class HiveMetaStore extends ThriftHiveMetastore {
InvalidObjectException, MetaException, NoSuchObjectException,
TException {
validateFunctionInfo(func);
-
boolean success = false;
RawStore ms = getMS();
try {
ms.openTransaction();
-
Database db = ms.getDatabase(func.getDbName());
if (db == null) {
throw new NoSuchObjectException("The database " + func.getDbName() + " does not exist");
}
+
Function existingFunc = ms.getFunction(func.getDbName(), func.getFunctionName());
if (existingFunc != null) {
throw new AlreadyExistsException(
"Function " + func.getFunctionName() + " already exists");
}
- // set create time
long time = System.currentTimeMillis() / 1000;
func.setCreateTime((int) time);
ms.createFunction(func);
+ if (transactionalListeners.size() > 0) {
+ CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onCreateFunction(createFunctionEvent);
+ }
+ }
+
success = ms.commitTransaction();
} finally {
if (!success) {
ms.rollbackTransaction();
}
+
+ if (listeners.size() > 0) {
+ CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onCreateFunction(createFunctionEvent);
+ }
+ }
}
}
@@ -6216,20 +6216,33 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false;
Function func = null;
RawStore ms = getMS();
-
try {
ms.openTransaction();
-
func = ms.getFunction(dbName, funcName);
if (func == null) {
throw new NoSuchObjectException("Function " + funcName + " does not exist");
}
+
ms.dropFunction(dbName, funcName);
+ if (transactionalListeners.size() > 0) {
+ DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropFunction(dropFunctionEvent);
+ }
+ }
+
success = ms.commitTransaction();
} finally {
if (!success) {
ms.rollbackTransaction();
}
+
+ if (listeners.size() > 0) {
+ DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onDropFunction(dropFunctionEvent);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
index 5e46ae1..b0defb5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
@@ -145,6 +147,20 @@ public abstract class MetaStoreEventListener implements Configurable {
}
/**
+ * @param fnEvent function event
+ * @throws MetaException
+ */
+ public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+ }
+
+ /**
+ * @param fnEvent function event
+ * @throws MetaException
+ */
+ public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+ }
+
+ /**
* This will be called when an insert is executed that does not cause a partition to be added.
* If an insert causes a partition to be added it will cause {@link #onAddPartition} to be
* called instead.
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java
new file mode 100644
index 0000000..717ede2
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.api.Function;
+
+public class CreateFunctionEvent extends ListenerEvent {
+
+ private final Function function;
+
+ public CreateFunctionEvent (Function function, boolean status, HMSHandler handler) {
+ super (status, handler);
+ this.function = function;
+ }
+
+ /**
+ * @return the function
+ */
+ public Function getFunction () {
+ return function;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java
new file mode 100644
index 0000000..7190aae
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.api.Function;
+
+public class DropFunctionEvent extends ListenerEvent {
+
+ private final Function function;
+
+ public DropFunctionEvent(Function function, boolean status, HMSHandler handler) {
+ super(status, handler);
+ this.function = function;
+ }
+
+ /**
+ * @return the function
+ */
+ public Function getFunction() {
+ return function;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java
index a3b16d0..182e724 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
@@ -126,6 +128,16 @@ public class DummyListener extends MetaStoreEventListener{
addEvent(indexEvent);
}
+ @Override
+ public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+ addEvent(fnEvent);
+ }
+
+ @Override
+ public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+ addEvent(fnEvent);
+ }
+
private void addEvent(ListenerEvent event) {
notifyList.add(event);
}