You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/30 12:05:37 UTC
hive git commit: HIVE-20209: Metastore connection fails for first
attempt in repl dump (Sankar Hariappan, reviewed by Mahesh Kumar Behera,
Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/master d3b036920 -> 1203ee834
HIVE-20209: Metastore connection fails for first attempt in repl dump (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1203ee83
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1203ee83
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1203ee83
Branch: refs/heads/master
Commit: 1203ee834d709d4710fd6a41daaeb6da48c4d8f6
Parents: d3b0369
Author: Sankar Hariappan <sa...@apache.org>
Authored: Mon Jul 30 17:35:21 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Mon Jul 30 17:35:21 2018 +0530
----------------------------------------------------------------------
.../api/repl/ReplicationV1CompatRule.java | 29 ++-
.../hive/ql/parse/TestReplicationScenarios.java | 2 +-
.../ql/cache/results/QueryResultsCache.java | 4 -
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 31 +--
.../apache/hadoop/hive/ql/metadata/Hive.java | 5 +-
.../hive/ql/metadata/events/EventUtils.java | 203 +++++++++++++++++++
.../metadata/events/NotificationEventPoll.java | 7 +-
.../hive/metastore/messaging/EventUtils.java | 202 ------------------
8 files changed, 230 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
index d5e227c..550a5e5 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
@@ -22,8 +22,10 @@ import com.google.common.primitives.Ints;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.api.HCatNotificationEvent;
import org.apache.thrift.TException;
@@ -43,6 +45,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Utility class to enable testing of Replv1 compatibility testing.
*
@@ -65,6 +70,8 @@ public class ReplicationV1CompatRule implements TestRule {
private HiveConf hconf = null;
private List<String> testsToSkip = null;
+ private Hive hiveDb;
+
public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf){
this(metaStoreClient, hconf, new ArrayList<String>());
}
@@ -79,6 +86,7 @@ public class ReplicationV1CompatRule implements TestRule {
};
this.testsToSkip = testsToSkip;
LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get());
+ this.hiveDb = mock(Hive.class);
}
private Long getCurrentNotificationId(){
@@ -137,16 +145,17 @@ public class ReplicationV1CompatRule implements TestRule {
return true;
}
};
- EventUtils.MSClientNotificationFetcher evFetcher =
- new EventUtils.MSClientNotificationFetcher(metaStoreClient);
try {
+ when(hiveDb.getMSC()).thenReturn(metaStoreClient);
+ EventUtils.MSClientNotificationFetcher evFetcher =
+ new EventUtils.MSClientNotificationFetcher(hiveDb);
EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
- evFetcher, testEventIdBefore,
- Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
- evFilter);
+ evFetcher, testEventIdBefore,
+ Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
+ evFilter);
ReplicationTask.resetFactory(null);
- assertTrue("We should have found some events",evIter.hasNext());
- while (evIter.hasNext()){
+ assertTrue("We should have found some events", evIter.hasNext());
+ while (evIter.hasNext()) {
eventCount++;
NotificationEvent ev = evIter.next();
// convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it.
@@ -155,11 +164,11 @@ public class ReplicationV1CompatRule implements TestRule {
if (rtask instanceof ErroredReplicationTask) {
unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause());
}
- } catch (RuntimeException re){
+ } catch (RuntimeException re) {
incompatibleTasks.put(ev, re);
}
}
- } catch (IOException e) {
+ } catch (IOException | MetaException e) {
assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index f0098bf..3d509f3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1569,7 +1569,7 @@ public class TestReplicationScenarios {
run("USE " + replDbName, driverMirror);
verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
"location", "namelist/year=1990/month=5/day=25", driverMirror);
- run("USE " + dbName, driverMirror);
+ run("USE " + dbName, driver);
String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" };
String[] data_after_ovwrite = new String[] { "fisher" };
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 44f7bf8..1ca7c11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -37,14 +37,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -62,13 +60,11 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.Entity.Type;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.events.EventConsumer;
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 9a5e6df..f24bcbd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.repl;
-import com.google.common.primitives.Ints;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -33,7 +31,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
@@ -48,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
@@ -161,7 +159,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
EventUtils.MSClientNotificationFetcher evFetcher
- = new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+ = new EventUtils.MSClientNotificationFetcher(getHive());
EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
@@ -267,30 +265,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
replLogger.endLog(bootDumpBeginReplId.toString());
}
Long bootDumpEndReplId = currentNotificationId(hiveDb);
- LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
- bootDumpEndReplId);
-
- // Now that bootstrap has dumped all objects related, we have to account for the changes
- // that occurred while bootstrap was happening - i.e. we have to look through all events
- // during the bootstrap period and consolidate them with our dump.
-
- IMetaStoreClient.NotificationFilter evFilter =
- new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern);
- EventUtils.MSClientNotificationFetcher evFetcher =
- new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC());
- EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
- evFetcher, bootDumpBeginReplId,
- Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
- evFilter);
-
- // Now we consolidate all the events that happenned during the objdump into the objdump
- while (evIter.hasNext()) {
- NotificationEvent ev = evIter.next();
- Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
- // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
- }
- LOG.info(
- "Consolidation done, preparing to return {},{}->{}",
+ LOG.info("Preparing to return {},{}->{}",
dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
dmd.write();
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e577f5e..239a606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -400,9 +400,8 @@ public class Hive {
metaStoreClient.close();
metaStoreClient = null;
}
- if (syncMetaStoreClient != null) {
- syncMetaStoreClient.close();
- }
+ // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once.
+ syncMetaStoreClient = null;
if (owner != null) {
owner = null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
new file mode 100644
index 0000000..66abd51
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.ql.metadata.events;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class EventUtils {
+
+ public interface NotificationFetcher {
+ int getBatchSize() throws IOException;
+ long getCurrentNotificationEventId() throws IOException;
+ long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException;
+ List<NotificationEvent> getNextNotificationEvents(
+ long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
+ }
+
+ // MetaStoreClient-based impl of NotificationFetcher
+ public static class MSClientNotificationFetcher implements NotificationFetcher{
+
+ private Hive hiveDb = null;
+ private Integer batchSize = null;
+
+ public MSClientNotificationFetcher(Hive hiveDb){
+ this.hiveDb = hiveDb;
+ }
+
+ @Override
+ public int getBatchSize() throws IOException {
+ if (batchSize == null){
+ try {
+ batchSize = Integer.parseInt(
+ hiveDb.getMSC().getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50"));
+ // TODO: we're asking the metastore what its configuration for this var is - we may
+ // want to revisit to pull from client side instead. The reason I have it this way
+ // is because the metastore is more likely to have a reasonable config for this than
+ // an arbitrary client.
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+ return batchSize;
+ }
+
+ @Override
+ public long getCurrentNotificationEventId() throws IOException {
+ try {
+ return hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException {
+ try {
+ NotificationEventsCountRequest rqst
+ = new NotificationEventsCountRequest(fromEventId, dbName);
+ return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount();
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List<NotificationEvent> getNextNotificationEvents(
+ long pos, IMetaStoreClient.NotificationFilter filter) throws IOException {
+ try {
+ return hiveDb.getMSC().getNextNotification(pos,getBatchSize(), filter).getEvents();
+ } catch (TException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public static class NotificationEventIterator implements Iterator<NotificationEvent> {
+
+ private NotificationFetcher nfetcher;
+ private IMetaStoreClient.NotificationFilter filter;
+ private int maxEvents;
+
+ private Iterator<NotificationEvent> batchIter = null;
+ private List<NotificationEvent> batch = null;
+ private long pos;
+ private long maxPos;
+ private int eventCount;
+
+ public NotificationEventIterator(
+ NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+ String dbName, String tableName) throws IOException {
+ init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName));
+ // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
+ // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
+ // ctors
+ }
+
+ public NotificationEventIterator(
+ NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+ IMetaStoreClient.NotificationFilter filter) throws IOException {
+ init(nfetcher,eventFrom,maxEvents,filter);
+ }
+
+ private void init(
+ NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+ IMetaStoreClient.NotificationFilter filter) throws IOException {
+ this.nfetcher = nfetcher;
+ this.filter = filter;
+ this.pos = eventFrom;
+ if (maxEvents < 1){
+ // 0 or -1 implies fetch everything
+ this.maxEvents = Integer.MAX_VALUE;
+ } else {
+ this.maxEvents = maxEvents;
+ }
+
+ this.eventCount = 0;
+ this.maxPos = nfetcher.getCurrentNotificationEventId();
+ }
+
+ private void fetchNextBatch() throws IOException {
+ batch = nfetcher.getNextNotificationEvents(pos, filter);
+ int batchSize = nfetcher.getBatchSize();
+ while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){
+ // no valid events this batch, but we're still not done processing events
+ pos += batchSize;
+ batch = nfetcher.getNextNotificationEvents(pos,filter);
+ }
+
+ if (batch == null){
+ batch = new ArrayList<>();
+ // instantiate empty list so that we don't error out on iterator fetching.
+ // If we're here, then the next check of pos will show our caller that
+ // that we've exhausted our event supply
+ }
+ batchIter = batch.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (eventCount >= maxEvents){
+ // If we've already satisfied the number of events we were supposed to deliver, we end it.
+ return false;
+ }
+ if ((batchIter != null) && (batchIter.hasNext())){
+ // If we have a valid batchIter and it has more elements, return them.
+ return true;
+ }
+ // If we're here, we want more events, and either batchIter is null, or batchIter
+ // has reached the end of the current batch. Let's fetch the next batch.
+ try {
+ fetchNextBatch();
+ } catch (IOException e) {
+ // Regrettable that we have to wrap the IOException into a RuntimeException,
+ // but throwing the exception is the appropriate result here, and hasNext()
+ // signature will only allow RuntimeExceptions. Iterator.hasNext() really
+ // should have allowed IOExceptions
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ // New batch has been fetched. If it's not empty, we have more elements to process.
+ return !batch.isEmpty();
+ }
+
+ @Override
+ public NotificationEvent next() {
+ eventCount++;
+ NotificationEvent ev = batchIter.next();
+ pos = ev.getEventId();
+ return ev;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
index c35ca44..010f00c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
@@ -26,15 +26,12 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -99,7 +96,7 @@ public class NotificationEventPoll {
}
EventUtils.MSClientNotificationFetcher evFetcher
- = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
+ = new EventUtils.MSClientNotificationFetcher(Hive.get());
lastCheckedEventId = evFetcher.getCurrentNotificationEventId();
LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId);
@@ -135,7 +132,7 @@ public class NotificationEventPoll {
// Get any new notification events that have been since the last time we checked,
// And pass them on to the event handlers.
EventUtils.MSClientNotificationFetcher evFetcher
- = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
+ = new EventUtils.MSClientNotificationFetcher(Hive.get());
EventUtils.NotificationEventIterator evIter =
new EventUtils.NotificationEventIterator(evFetcher, lastCheckedEventId, 0, "*", null);
http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
deleted file mode 100644
index 2b16897..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.messaging;
-
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class EventUtils {
-
- public interface NotificationFetcher {
- int getBatchSize() throws IOException;
- long getCurrentNotificationEventId() throws IOException;
- long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException;
- List<NotificationEvent> getNextNotificationEvents(
- long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
- }
-
- // MetaStoreClient-based impl of NotificationFetcher
- public static class MSClientNotificationFetcher implements NotificationFetcher{
-
- private IMetaStoreClient msc = null;
- private Integer batchSize = null;
-
- public MSClientNotificationFetcher(IMetaStoreClient msc){
- this.msc = msc;
- }
-
- @Override
- public int getBatchSize() throws IOException {
- if (batchSize == null){
- try {
- batchSize = Integer.parseInt(
- msc.getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50"));
- // TODO: we're asking the metastore what its configuration for this var is - we may
- // want to revisit to pull from client side instead. The reason I have it this way
- // is because the metastore is more likely to have a reasonable config for this than
- // an arbitrary client.
- } catch (TException e) {
- throw new IOException(e);
- }
- }
- return batchSize;
- }
-
- @Override
- public long getCurrentNotificationEventId() throws IOException {
- try {
- return msc.getCurrentNotificationEventId().getEventId();
- } catch (TException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException {
- try {
- NotificationEventsCountRequest rqst
- = new NotificationEventsCountRequest(fromEventId, dbName);
- return msc.getNotificationEventsCount(rqst).getEventsCount();
- } catch (TException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public List<NotificationEvent> getNextNotificationEvents(
- long pos, IMetaStoreClient.NotificationFilter filter) throws IOException {
- try {
- return msc.getNextNotification(pos,getBatchSize(), filter).getEvents();
- } catch (TException e) {
- throw new IOException(e.getMessage(), e);
- }
- }
- }
-
- public static class NotificationEventIterator implements Iterator<NotificationEvent> {
-
- private NotificationFetcher nfetcher;
- private IMetaStoreClient.NotificationFilter filter;
- private int maxEvents;
-
- private Iterator<NotificationEvent> batchIter = null;
- private List<NotificationEvent> batch = null;
- private long pos;
- private long maxPos;
- private int eventCount;
-
- public NotificationEventIterator(
- NotificationFetcher nfetcher, long eventFrom, int maxEvents,
- String dbName, String tableName) throws IOException {
- init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName));
- // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
- // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
- // ctors
- }
-
- public NotificationEventIterator(
- NotificationFetcher nfetcher, long eventFrom, int maxEvents,
- IMetaStoreClient.NotificationFilter filter) throws IOException {
- init(nfetcher,eventFrom,maxEvents,filter);
- }
-
- private void init(
- NotificationFetcher nfetcher, long eventFrom, int maxEvents,
- IMetaStoreClient.NotificationFilter filter) throws IOException {
- this.nfetcher = nfetcher;
- this.filter = filter;
- this.pos = eventFrom;
- if (maxEvents < 1){
- // 0 or -1 implies fetch everything
- this.maxEvents = Integer.MAX_VALUE;
- } else {
- this.maxEvents = maxEvents;
- }
-
- this.eventCount = 0;
- this.maxPos = nfetcher.getCurrentNotificationEventId();
- }
-
- private void fetchNextBatch() throws IOException {
- batch = nfetcher.getNextNotificationEvents(pos, filter);
- int batchSize = nfetcher.getBatchSize();
- while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){
- // no valid events this batch, but we're still not done processing events
- pos += batchSize;
- batch = nfetcher.getNextNotificationEvents(pos,filter);
- }
-
- if (batch == null){
- batch = new ArrayList<>();
- // instantiate empty list so that we don't error out on iterator fetching.
- // If we're here, then the next check of pos will show our caller that
- // that we've exhausted our event supply
- }
- batchIter = batch.iterator();
- }
-
- @Override
- public boolean hasNext() {
- if (eventCount >= maxEvents){
- // If we've already satisfied the number of events we were supposed to deliver, we end it.
- return false;
- }
- if ((batchIter != null) && (batchIter.hasNext())){
- // If we have a valid batchIter and it has more elements, return them.
- return true;
- }
- // If we're here, we want more events, and either batchIter is null, or batchIter
- // has reached the end of the current batch. Let's fetch the next batch.
- try {
- fetchNextBatch();
- } catch (IOException e) {
- // Regrettable that we have to wrap the IOException into a RuntimeException,
- // but throwing the exception is the appropriate result here, and hasNext()
- // signature will only allow RuntimeExceptions. Iterator.hasNext() really
- // should have allowed IOExceptions
- throw new RuntimeException(e.getMessage(), e);
- }
- // New batch has been fetched. If it's not empty, we have more elements to process.
- return !batch.isEmpty();
- }
-
- @Override
- public NotificationEvent next() {
- eventCount++;
- NotificationEvent ev = batchIter.next();
- pos = ev.getEventId();
- return ev;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator");
- }
-
- }
-}