You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/30 12:05:37 UTC

hive git commit: HIVE-20209: Metastore connection fails for first attempt in repl dump (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/master d3b036920 -> 1203ee834


HIVE-20209: Metastore connection fails for first attempt in repl dump (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1203ee83
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1203ee83
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1203ee83

Branch: refs/heads/master
Commit: 1203ee834d709d4710fd6a41daaeb6da48c4d8f6
Parents: d3b0369
Author: Sankar Hariappan <sa...@apache.org>
Authored: Mon Jul 30 17:35:21 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Mon Jul 30 17:35:21 2018 +0530

----------------------------------------------------------------------
 .../api/repl/ReplicationV1CompatRule.java       |  29 ++-
 .../hive/ql/parse/TestReplicationScenarios.java |   2 +-
 .../ql/cache/results/QueryResultsCache.java     |   4 -
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |  31 +--
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   5 +-
 .../hive/ql/metadata/events/EventUtils.java     | 203 +++++++++++++++++++
 .../metadata/events/NotificationEventPoll.java  |   7 +-
 .../hive/metastore/messaging/EventUtils.java    | 202 ------------------
 8 files changed, 230 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
index d5e227c..550a5e5 100644
--- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
+++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/ReplicationV1CompatRule.java
@@ -22,8 +22,10 @@ import com.google.common.primitives.Ints;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
 import org.apache.hive.hcatalog.api.HCatClient;
 import org.apache.hive.hcatalog.api.HCatNotificationEvent;
 import org.apache.thrift.TException;
@@ -43,6 +45,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Utility class to enable testing of Replv1 compatibility testing.
  *
@@ -65,6 +70,8 @@ public class ReplicationV1CompatRule implements TestRule {
   private HiveConf hconf = null;
   private List<String> testsToSkip = null;
 
+  private Hive hiveDb;
+
   public ReplicationV1CompatRule(IMetaStoreClient metaStoreClient, HiveConf hconf){
     this(metaStoreClient, hconf, new ArrayList<String>());
   }
@@ -79,6 +86,7 @@ public class ReplicationV1CompatRule implements TestRule {
     };
     this.testsToSkip = testsToSkip;
     LOG.info("Replv1 backward compatibility tester initialized at " + testEventId.get());
+    this.hiveDb = mock(Hive.class);
   }
 
   private Long getCurrentNotificationId(){
@@ -137,16 +145,17 @@ public class ReplicationV1CompatRule implements TestRule {
             return true;
           }
         };
-    EventUtils.MSClientNotificationFetcher evFetcher =
-        new EventUtils.MSClientNotificationFetcher(metaStoreClient);
     try {
+      when(hiveDb.getMSC()).thenReturn(metaStoreClient);
+      EventUtils.MSClientNotificationFetcher evFetcher =
+              new EventUtils.MSClientNotificationFetcher(hiveDb);
       EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
-          evFetcher, testEventIdBefore,
-          Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
-          evFilter);
+              evFetcher, testEventIdBefore,
+              Ints.checkedCast(testEventIdAfter - testEventIdBefore) + 1,
+              evFilter);
       ReplicationTask.resetFactory(null);
-      assertTrue("We should have found some events",evIter.hasNext());
-      while (evIter.hasNext()){
+      assertTrue("We should have found some events", evIter.hasNext());
+      while (evIter.hasNext()) {
         eventCount++;
         NotificationEvent ev = evIter.next();
         // convert to HCatNotificationEvent, and then try to instantiate a ReplicationTask on it.
@@ -155,11 +164,11 @@ public class ReplicationV1CompatRule implements TestRule {
           if (rtask instanceof ErroredReplicationTask) {
             unhandledTasks.put(ev, ((ErroredReplicationTask) rtask).getCause());
           }
-        } catch (RuntimeException re){
+        } catch (RuntimeException re) {
           incompatibleTasks.put(ev, re);
         }
       }
-    } catch (IOException e) {
+    } catch (IOException | MetaException e) {
       assertNull("Got an exception when we shouldn't have - replv1 backward incompatibility issue:",e);
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index f0098bf..3d509f3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1569,7 +1569,7 @@ public class TestReplicationScenarios {
     run("USE " + replDbName, driverMirror);
     verifyRunWithPatternMatch("SHOW TABLE EXTENDED LIKE namelist PARTITION (year=1990,month=5,day=25)",
             "location", "namelist/year=1990/month=5/day=25", driverMirror);
-    run("USE " + dbName, driverMirror);
+    run("USE " + dbName, driver);
 
     String[] ptn_data_3 = new String[] { "abraham", "bob", "carter", "david", "fisher" };
     String[] data_after_ovwrite = new String[] { "fisher" };

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 44f7bf8..1ca7c11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -37,14 +37,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
@@ -62,13 +60,11 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.Entity.Type;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.events.EventConsumer;

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 9a5e6df..f24bcbd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl;
 
-import com.google.common.primitives.Ints;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -33,7 +31,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
@@ -48,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
@@ -161,7 +159,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
 
     EventUtils.MSClientNotificationFetcher evFetcher
-        = new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+        = new EventUtils.MSClientNotificationFetcher(getHive());
 
     EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
         evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
@@ -267,30 +265,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       replLogger.endLog(bootDumpBeginReplId.toString());
     }
     Long bootDumpEndReplId = currentNotificationId(hiveDb);
-    LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
-        bootDumpEndReplId);
-
-    // Now that bootstrap has dumped all objects related, we have to account for the changes
-    // that occurred while bootstrap was happening - i.e. we have to look through all events
-    // during the bootstrap period and consolidate them with our dump.
-
-    IMetaStoreClient.NotificationFilter evFilter =
-        new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern);
-    EventUtils.MSClientNotificationFetcher evFetcher =
-        new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC());
-    EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
-        evFetcher, bootDumpBeginReplId,
-        Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
-        evFilter);
-
-    // Now we consolidate all the events that happenned during the objdump into the objdump
-    while (evIter.hasNext()) {
-      NotificationEvent ev = evIter.next();
-      Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
-      // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
-    }
-    LOG.info(
-        "Consolidation done, preparing to return {},{}->{}",
+    LOG.info("Preparing to return {},{}->{}",
         dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
     dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
     dmd.write();

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e577f5e..239a606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -400,9 +400,8 @@ public class Hive {
       metaStoreClient.close();
       metaStoreClient = null;
     }
-    if (syncMetaStoreClient != null) {
-      syncMetaStoreClient.close();
-    }
+    // syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once.
+    syncMetaStoreClient = null;
     if (owner != null) {
       owner = null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
new file mode 100644
index 0000000..66abd51
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.ql.metadata.events;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class EventUtils {
+
+  public interface NotificationFetcher {
+    int getBatchSize() throws IOException;
+    long getCurrentNotificationEventId() throws IOException;
+    long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException;
+    List<NotificationEvent> getNextNotificationEvents(
+        long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
+  }
+
+  // MetaStoreClient-based impl of NotificationFetcher
+  public static class MSClientNotificationFetcher implements  NotificationFetcher{
+
+    private Hive hiveDb = null;
+    private Integer batchSize = null;
+
+    public MSClientNotificationFetcher(Hive hiveDb){
+      this.hiveDb = hiveDb;
+    }
+
+    @Override
+    public int getBatchSize() throws IOException {
+      if (batchSize == null){
+        try {
+          batchSize = Integer.parseInt(
+                  hiveDb.getMSC().getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50"));
+          // TODO: we're asking the metastore what its configuration for this var is - we may
+          // want to revisit to pull from client side instead. The reason I have it this way
+          // is because the metastore is more likely to have a reasonable config for this than
+          // an arbitrary client.
+        } catch (TException e) {
+          throw new IOException(e);
+        }
+      }
+      return batchSize;
+    }
+
+    @Override
+    public long getCurrentNotificationEventId() throws IOException {
+      try {
+        return hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException {
+      try {
+        NotificationEventsCountRequest rqst
+                = new NotificationEventsCountRequest(fromEventId, dbName);
+        return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount();
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public List<NotificationEvent> getNextNotificationEvents(
+        long pos, IMetaStoreClient.NotificationFilter filter) throws IOException {
+      try {
+        return hiveDb.getMSC().getNextNotification(pos,getBatchSize(), filter).getEvents();
+      } catch (TException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+    }
+  }
+
+  public static class NotificationEventIterator implements Iterator<NotificationEvent> {
+
+    private NotificationFetcher nfetcher;
+    private IMetaStoreClient.NotificationFilter filter;
+    private int maxEvents;
+
+    private Iterator<NotificationEvent> batchIter = null;
+    private List<NotificationEvent> batch = null;
+    private long pos;
+    private long maxPos;
+    private int eventCount;
+
+    public NotificationEventIterator(
+        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+        String dbName, String tableName) throws IOException {
+      init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName));
+      // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
+      // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
+      // ctors
+    }
+
+    public NotificationEventIterator(
+        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+        IMetaStoreClient.NotificationFilter filter) throws IOException {
+      init(nfetcher,eventFrom,maxEvents,filter);
+    }
+
+    private void init(
+        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+        IMetaStoreClient.NotificationFilter filter) throws IOException {
+      this.nfetcher = nfetcher;
+      this.filter = filter;
+      this.pos = eventFrom;
+      if (maxEvents < 1){
+        // 0 or -1 implies fetch everything
+        this.maxEvents = Integer.MAX_VALUE;
+      } else {
+        this.maxEvents = maxEvents;
+      }
+
+      this.eventCount = 0;
+      this.maxPos = nfetcher.getCurrentNotificationEventId();
+    }
+
+    private void fetchNextBatch() throws IOException {
+      batch = nfetcher.getNextNotificationEvents(pos, filter);
+      int batchSize = nfetcher.getBatchSize();
+      while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){
+        // no valid events this batch, but we're still not done processing events
+        pos += batchSize;
+        batch = nfetcher.getNextNotificationEvents(pos,filter);
+      }
+
+      if (batch == null){
+        batch = new ArrayList<>();
+        // instantiate empty list so that we don't error out on iterator fetching.
+        // If we're here, then the next check of pos will show our caller that
+        // that we've exhausted our event supply
+      }
+      batchIter = batch.iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (eventCount >= maxEvents){
+        // If we've already satisfied the number of events we were supposed to deliver, we end it.
+        return false;
+      }
+      if ((batchIter != null) && (batchIter.hasNext())){
+        // If we have a valid batchIter and it has more elements, return them.
+        return true;
+      }
+      // If we're here, we want more events, and either batchIter is null, or batchIter
+      // has reached the end of the current batch. Let's fetch the next batch.
+      try {
+        fetchNextBatch();
+      } catch (IOException e) {
+        // Regrettable that we have to wrap the IOException into a RuntimeException,
+        // but throwing the exception is the appropriate result here, and hasNext()
+        // signature will only allow RuntimeExceptions. Iterator.hasNext() really
+        // should have allowed IOExceptions
+        throw new RuntimeException(e.getMessage(), e);
+      }
+      // New batch has been fetched. If it's not empty, we have more elements to process.
+      return !batch.isEmpty();
+    }
+
+    @Override
+    public NotificationEvent next() {
+      eventCount++;
+      NotificationEvent ev = batchIter.next();
+      pos = ev.getEventId();
+      return ev;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator");
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
index c35ca44..010f00c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java
@@ -26,15 +26,12 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -99,7 +96,7 @@ public class NotificationEventPoll {
     }
 
     EventUtils.MSClientNotificationFetcher evFetcher
-        = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
+        = new EventUtils.MSClientNotificationFetcher(Hive.get());
     lastCheckedEventId = evFetcher.getCurrentNotificationEventId();
     LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId);
 
@@ -135,7 +132,7 @@ public class NotificationEventPoll {
         // Get any new notification events that have been since the last time we checked,
         // And pass them on to the event handlers.
         EventUtils.MSClientNotificationFetcher evFetcher
-            = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC());
+            = new EventUtils.MSClientNotificationFetcher(Hive.get());
         EventUtils.NotificationEventIterator evIter =
             new EventUtils.NotificationEventIterator(evFetcher, lastCheckedEventId, 0, "*", null);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1203ee83/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
deleted file mode 100644
index 2b16897..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.messaging;
-
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class EventUtils {
-
-  public interface NotificationFetcher {
-    int getBatchSize() throws IOException;
-    long getCurrentNotificationEventId() throws IOException;
-    long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException;
-    List<NotificationEvent> getNextNotificationEvents(
-        long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
-  }
-
-  // MetaStoreClient-based impl of NotificationFetcher
-  public static class MSClientNotificationFetcher implements  NotificationFetcher{
-
-    private IMetaStoreClient msc = null;
-    private Integer batchSize = null;
-
-    public MSClientNotificationFetcher(IMetaStoreClient msc){
-      this.msc = msc;
-    }
-
-    @Override
-    public int getBatchSize() throws IOException {
-      if (batchSize == null){
-        try {
-          batchSize = Integer.parseInt(
-            msc.getConfigValue(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.toString(), "50"));
-          // TODO: we're asking the metastore what its configuration for this var is - we may
-          // want to revisit to pull from client side instead. The reason I have it this way
-          // is because the metastore is more likely to have a reasonable config for this than
-          // an arbitrary client.
-        } catch (TException e) {
-          throw new IOException(e);
-        }
-      }
-      return batchSize;
-    }
-
-    @Override
-    public long getCurrentNotificationEventId() throws IOException {
-      try {
-        return msc.getCurrentNotificationEventId().getEventId();
-      } catch (TException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException {
-      try {
-        NotificationEventsCountRequest rqst
-                = new NotificationEventsCountRequest(fromEventId, dbName);
-        return msc.getNotificationEventsCount(rqst).getEventsCount();
-      } catch (TException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public List<NotificationEvent> getNextNotificationEvents(
-        long pos, IMetaStoreClient.NotificationFilter filter) throws IOException {
-      try {
-        return msc.getNextNotification(pos,getBatchSize(), filter).getEvents();
-      } catch (TException e) {
-        throw new IOException(e.getMessage(), e);
-      }
-    }
-  }
-
-  public static class NotificationEventIterator implements Iterator<NotificationEvent> {
-
-    private NotificationFetcher nfetcher;
-    private IMetaStoreClient.NotificationFilter filter;
-    private int maxEvents;
-
-    private Iterator<NotificationEvent> batchIter = null;
-    private List<NotificationEvent> batch = null;
-    private long pos;
-    private long maxPos;
-    private int eventCount;
-
-    public NotificationEventIterator(
-        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
-        String dbName, String tableName) throws IOException {
-      init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName));
-      // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
-      // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
-      // ctors
-    }
-
-    public NotificationEventIterator(
-        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
-        IMetaStoreClient.NotificationFilter filter) throws IOException {
-      init(nfetcher,eventFrom,maxEvents,filter);
-    }
-
-    private void init(
-        NotificationFetcher nfetcher, long eventFrom, int maxEvents,
-        IMetaStoreClient.NotificationFilter filter) throws IOException {
-      this.nfetcher = nfetcher;
-      this.filter = filter;
-      this.pos = eventFrom;
-      if (maxEvents < 1){
-        // 0 or -1 implies fetch everything
-        this.maxEvents = Integer.MAX_VALUE;
-      } else {
-        this.maxEvents = maxEvents;
-      }
-
-      this.eventCount = 0;
-      this.maxPos = nfetcher.getCurrentNotificationEventId();
-    }
-
-    private void fetchNextBatch() throws IOException {
-      batch = nfetcher.getNextNotificationEvents(pos, filter);
-      int batchSize = nfetcher.getBatchSize();
-      while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){
-        // no valid events this batch, but we're still not done processing events
-        pos += batchSize;
-        batch = nfetcher.getNextNotificationEvents(pos,filter);
-      }
-
-      if (batch == null){
-        batch = new ArrayList<>();
-        // instantiate empty list so that we don't error out on iterator fetching.
-        // If we're here, then the next check of pos will show our caller that
-        // that we've exhausted our event supply
-      }
-      batchIter = batch.iterator();
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (eventCount >= maxEvents){
-        // If we've already satisfied the number of events we were supposed to deliver, we end it.
-        return false;
-      }
-      if ((batchIter != null) && (batchIter.hasNext())){
-        // If we have a valid batchIter and it has more elements, return them.
-        return true;
-      }
-      // If we're here, we want more events, and either batchIter is null, or batchIter
-      // has reached the end of the current batch. Let's fetch the next batch.
-      try {
-        fetchNextBatch();
-      } catch (IOException e) {
-        // Regrettable that we have to wrap the IOException into a RuntimeException,
-        // but throwing the exception is the appropriate result here, and hasNext()
-        // signature will only allow RuntimeExceptions. Iterator.hasNext() really
-        // should have allowed IOExceptions
-        throw new RuntimeException(e.getMessage(), e);
-      }
-      // New batch has been fetched. If it's not empty, we have more elements to process.
-      return !batch.isEmpty();
-    }
-
-    @Override
-    public NotificationEvent next() {
-      eventCount++;
-      NotificationEvent ev = batchIter.next();
-      pos = ev.getEventId();
-      return ev;
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator");
-    }
-
-  }
-}