You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/01 16:15:37 UTC

[2/3] impala git commit: IMPALA-6583: Wait for non-empty catalog update at impalad startup

IMPALA-6583: Wait for non-empty catalog update at impalad startup

Currently an empty statestore catalog update would enable impalad to
begin compiling queries. It breaks many custom cluster tests. This patch
makes impalad wait for a non-zero catalog version.

Testing: test_single_coordinator_cluster_config is run in a loop for
several hours. Core tests are also run.

Change-Id: Ie5f09d03497543f283b39c6186ecfda2e0097c87
Reviewed-on: http://gerrit.cloudera.org:8080/9458
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/85000c28
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/85000c28
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/85000c28

Branch: refs/heads/2.x
Commit: 85000c2866de79258fdc656f40684c190ed6244d
Parents: 37586a8
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Mon Feb 26 15:00:06 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 1 05:34:02 2018 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |  4 +++
 .../apache/impala/catalog/ImpaladCatalog.java   | 27 ++++++++++++--------
 .../org/apache/impala/service/Frontend.java     |  5 ++--
 3 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/85000c28/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index a3b0cb0..7ea821c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1120,6 +1120,10 @@ public class CatalogServiceCatalog extends Catalog {
     reader.run();
 
     versionLock_.writeLock().lock();
+    // In case of an empty new catalog, the version should still change to reflect the
+    // reset operation itself and to unblock impalads by making the catalog version >
+    // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog()
+    ++catalogVersion_;
     // Assign new versions to all the loaded data sources.
     for (DataSource dataSource: getDataSources()) {
       dataSource.setCatalogVersion(incrementAndGetCatalogVersion());

http://git-wip-us.apache.org/repos/asf/impala/blob/85000c28/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 1da5d0b..fce4574 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.TableName;
@@ -84,10 +85,8 @@ public class ImpaladCatalog extends Catalog {
   // all objects in the catalog have at a minimum, this version. Because updates may
   // be applied out of band of a StateStore heartbeat, it is possible the catalog
   // contains some objects > than this version.
-  private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
-
-  // Flag to determine if the Catalog is ready to accept user requests. See isReady().
-  private final AtomicBoolean isReady_ = new AtomicBoolean(false);
+  private AtomicLong lastSyncedCatalogVersion_ =
+      new AtomicLong(Catalog.INITIAL_CATALOG_VERSION);
 
   // Tracks modifications to this Impalad's catalog from direct updates to the cache.
   private final CatalogDeltaLog catalogDeltaLog_ = new CatalogDeltaLog();
@@ -159,7 +158,7 @@ public class ImpaladCatalog extends Catalog {
     if (req.isSetCatalog_service_id()) setCatalogServiceId(req.catalog_service_id);
     ArrayDeque<TCatalogObject> updatedObjects = new ArrayDeque<>();
     ArrayDeque<TCatalogObject> deletedObjects = new ArrayDeque<>();
-    long newCatalogVersion = lastSyncedCatalogVersion_;
+    long newCatalogVersion = lastSyncedCatalogVersion_.get();
     Pair<Boolean, ByteBuffer> update;
     while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
         != null) {
@@ -207,10 +206,9 @@ public class ImpaladCatalog extends Catalog {
 
     for (TCatalogObject catalogObject: deletedObjects) removeCatalogObject(catalogObject);
 
-    lastSyncedCatalogVersion_ = newCatalogVersion;
+    lastSyncedCatalogVersion_.set(newCatalogVersion);
     // Cleanup old entries in the log.
-    catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
-    isReady_.set(true);
+    catalogDeltaLog_.garbageCollect(newCatalogVersion);
     // Notify all the threads waiting on a catalog update.
     synchronized (catalogUpdateEventNotifier_) {
       catalogUpdateEventNotifier_.notifyAll();
@@ -369,7 +367,7 @@ public class ImpaladCatalog extends Catalog {
             "Unexpected TCatalogObjectType: " + catalogObject.getType());
     }
 
-    if (catalogObject.getCatalog_version() > lastSyncedCatalogVersion_) {
+    if (catalogObject.getCatalog_version() > lastSyncedCatalogVersion_.get()) {
       catalogDeltaLog_.addRemovedObject(catalogObject);
     }
   }
@@ -513,10 +511,17 @@ public class ImpaladCatalog extends Catalog {
    * received and processed a valid catalog topic update from the StateStore),
    * false otherwise.
    */
-  public boolean isReady() { return isReady_.get(); }
+  public boolean isReady() {
+    return lastSyncedCatalogVersion_.get() > INITIAL_CATALOG_VERSION;
+  }
 
   // Only used for testing.
-  public void setIsReady(boolean isReady) { isReady_.set(isReady); }
+  public void setIsReady(boolean isReady) {
+    lastSyncedCatalogVersion_.incrementAndGet();
+    synchronized (catalogUpdateEventNotifier_) {
+      catalogUpdateEventNotifier_.notifyAll();
+    }
+  }
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
   public String getDefaultKuduMasterHosts() { return defaultKuduMasterHosts_; }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/85000c28/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index f03feb0..41395d6 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -799,9 +799,10 @@ public class Frontend {
 
   /**
    * Waits indefinitely for the local catalog to be ready. The catalog is "ready" after
-   * the first catalog update is received from the statestore.
+   * the first catalog update with a version > INITIAL_CATALOG_VERSION is received from
+   * the statestore.
    *
-   * @see ImpaladCatalog.isReady
+   * @see ImpaladCatalog#isReady()
    */
   public void waitForCatalog() {
     LOG.info("Waiting for first catalog update from the statestore.");