You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/08 22:04:49 UTC

[1/2] impala git commit: IMPALA-7386. Replace CatalogObjectVersionQueue with a multiset

Repository: impala
Updated Branches:
  refs/heads/master a6c356850 -> 3c9fef2ae


IMPALA-7386. Replace CatalogObjectVersionQueue with a multiset

The implementation of CatalogObjectVersionQueue was based on a priority
queue, which is not a good choice of data structure for the use case.
This class exposes the following operations, with corresponding runtimes
for the heap:

- addVersion: O(lg n)
- removeVersion: O(n)
- getMinVersion: O(1)

Given that every update of a catalog object requires removing the old
version, the O(n) runtime could get quite expensive, particularly for
operations like INVALIDATE METADATA which end up removing the old
version of all of the objects in the catalog -- thus becoming
accidentally quadratic.

The new patch switches to a TreeMultiset coupled with a simple cache of
the min element, with runtimes:

- addVersion: O(lg n)
- removeVersion: O(lg n)
- getMinVersion: O(1)

The downside of this patch is increased memory overhead: we are going
from a PriorityQueue which has 8 bytes overhead per element to a
TreeMultiset which has 48 bytes overhead per element. In a catalog with
millions of tables this won't be insignificant; however, in such a
catalog the performance savings are probably critical. According to
a quick JMH benchmark, removal of an element from a PriorityQueue with
1M elements takes about 3.5ms, so an operation which invalidates all
million tables would take about 3500 seconds without this patch.
Meanwhile, the same benchmark using a TreeSet takes about 28ns per
removal, so the whole invalidation would take about 28ms.

This patch also makes the data structure synchronized, just for
simplicity's sake, since uncontended locks are very cheap in Java.

Change-Id: I1b6c58a1acef9b02fcc26a04d048ee9cc47dc0ef
Reviewed-on: http://gerrit.cloudera.org:8080/11109
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b9d44aff
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b9d44aff
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b9d44aff

Branch: refs/heads/master
Commit: b9d44aff3ee5850dc7f08ba61091c959f08bc474
Parents: a6c3568
Author: Todd Lipcon <to...@cloudera.com>
Authored: Wed Aug 1 17:56:25 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 8 21:28:32 2018 +0000

----------------------------------------------------------------------
 .../impala/catalog/AuthorizationPolicy.java     |   2 +-
 .../impala/catalog/CatalogObjectCache.java      |   8 +-
 .../catalog/CatalogObjectVersionQueue.java      |  75 --------------
 .../impala/catalog/CatalogObjectVersionSet.java | 102 +++++++++++++++++++
 .../apache/impala/catalog/ImpaladCatalog.java   |  28 ++---
 .../catalog/CatalogObjectVersionSetTest.java    | 102 +++++++++++++++++++
 6 files changed, 223 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b9d44aff/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index 1c84973..8c545fc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -86,7 +86,7 @@ public class AuthorizationPolicy implements PrivilegeCache {
     if (existingRole != null) {
       // Remove the role. This will also clean up the grantGroup mappings.
       removeRole(existingRole.getName());
-      CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
+      CatalogObjectVersionSet.INSTANCE.removeAll(existingRole.getPrivileges());
       if (existingRole.getId() == role.getId()) {
         // Copy the privileges from the existing role.
         for (RolePrivilege p: existingRole.getPrivileges()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b9d44aff/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
index d882cdb..38698c5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
@@ -31,7 +31,7 @@ import com.google.common.collect.Lists;
  * Thread safe cache for storing CatalogObjects. Enforces that updates to existing
  * entries only get applied if the new/updated object has a larger catalog version.
  * add() and remove() functions also update the entries of the global instance of
- * CatalogObjectVersionQueue which keeps track of the catalog objects versions.
+ * CatalogObjectVersionSet which keeps track of the catalog objects versions.
  */
 public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> {
   private final boolean caseInsensitiveKeys_;
@@ -74,7 +74,7 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
     if (caseInsensitiveKeys_) key = key.toLowerCase();
     T existingItem = metadataCache_.putIfAbsent(key, catalogObject);
     if (existingItem == null) {
-      CatalogObjectVersionQueue.INSTANCE.addVersion(
+      CatalogObjectVersionSet.INSTANCE.addVersion(
           catalogObject.getCatalogVersion());
       return true;
     }
@@ -84,7 +84,7 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
       // associated with the key. Add the updated object iff it has a catalog
       // version greater than the existing entry.
       metadataCache_.put(key, catalogObject);
-      CatalogObjectVersionQueue.INSTANCE.updateVersions(
+      CatalogObjectVersionSet.INSTANCE.updateVersions(
           existingItem.getCatalogVersion(), catalogObject.getCatalogVersion());
       return true;
     }
@@ -99,7 +99,7 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
     if (caseInsensitiveKeys_) name = name.toLowerCase();
     T removedObject = metadataCache_.remove(name);
     if (removedObject != null) {
-      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+      CatalogObjectVersionSet.INSTANCE.removeVersion(
           removedObject.getCatalogVersion());
     }
     return removedObject;

http://git-wip-us.apache.org/repos/asf/impala/blob/b9d44aff/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
deleted file mode 100644
index e24e66b..0000000
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog;
-
-import java.util.List;
-import java.util.PriorityQueue;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Singleton class used to maintain the versions of all the catalog objects stored in a
- * local catalog cache. Simple wrapper around a priority queue which stores the catalog
- * object versions, allowing O(1) retrieval of the minimum object version currently
- * stored in the cache. Provides a simple API to add, remove and update catalog object
- * versions. Not thread-safe.
- *
- * The primary use case of this class is to allow an Impalad catalog cache determine when
- * the result set of an INVALIDATE METADATA operation has been applied locally by keeping
- * track of the minimum catalog object version.
- */
-public class CatalogObjectVersionQueue {
-  private final PriorityQueue<Long> objectVersions_ = new PriorityQueue<>();
-
-  public static final CatalogObjectVersionQueue INSTANCE =
-      new CatalogObjectVersionQueue();
-
-  private CatalogObjectVersionQueue() {}
-
-  public void updateVersions(long oldVersion, long newVersion) {
-    removeVersion(oldVersion);
-    addVersion(newVersion);
-  }
-
-  public void removeVersion(long oldVersion) {
-    objectVersions_.remove(oldVersion);
-  }
-
-  public void addVersion(long newVersion) {
-    objectVersions_.add(newVersion);
-  }
-
-  public long getMinimumVersion() {
-    Long minVersion = objectVersions_.peek();
-    return minVersion != null ? minVersion : 0;
-  }
-
-  public void addAll(List<? extends CatalogObject> catalogObjects) {
-    for (CatalogObject catalogObject: catalogObjects) {
-      addVersion(catalogObject.getCatalogVersion());
-    }
-  }
-
-  public void removeAll(List<? extends CatalogObject> catalogObjects) {
-    for (CatalogObject catalogObject: catalogObjects) {
-      removeVersion(catalogObject.getCatalogVersion());
-    }
-  }
-
-  public void clear() { objectVersions_.clear(); }
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/b9d44aff/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionSet.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionSet.java
new file mode 100644
index 0000000..4b73f17
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionSet.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Multiset.Entry;
+import com.google.common.collect.TreeMultiset;
+
+/**
+ * Singleton class used to maintain the versions of all the catalog objects stored in a
+ * local catalog cache. Simple wrapper around a multiset which stores the catalog
+ * object versions. This allows O(lg n) addition or removal of objects as well as
+ * O(1) retrieval of the minimum object version currently stored in the cache.
+ *
+ * Provides a simple API to add, remove and update catalog object
+ * versions. Thread-safe.
+ *
+ * The primary use case of this class is to allow an Impalad catalog cache determine when
+ * the result set of an INVALIDATE METADATA operation has been applied locally by keeping
+ * track of the minimum catalog object version.
+ */
+public class CatalogObjectVersionSet {
+  // TODO(todd): it's likely that we should only have exactly one object at each
+  // version, in which case this should be a set, instead of a multiset. We should
+  // add appropriate preconditions checks and make this change.
+  private final TreeMultiset<Long> objectVersions_ = TreeMultiset.create();
+  /**
+   * Cache of the minimum element in objectVersions_. This provides O(1) runtime
+   * for the getMinimumVersion() call.
+   */
+  private long minVersion_ = Catalog.INITIAL_CATALOG_VERSION;
+
+  public static final CatalogObjectVersionSet INSTANCE =
+      new CatalogObjectVersionSet();
+
+  @VisibleForTesting
+  CatalogObjectVersionSet() {}
+
+  public synchronized void updateVersions(long oldVersion, long newVersion) {
+    removeVersion(oldVersion);
+    addVersion(newVersion);
+  }
+
+  public synchronized void removeVersion(long oldVersion) {
+    int oldCount = objectVersions_.remove(oldVersion, 1);
+    // If we removed the minimum version, and no other entries exist with
+    // the same min version, we need to update it.
+    if (oldCount == 1 && oldVersion == minVersion_) {
+      Entry<Long> entry = objectVersions_.firstEntry();
+      if (entry != null) {
+        minVersion_ = entry.getElement();
+      } else {
+        minVersion_ = Catalog.INITIAL_CATALOG_VERSION;
+      }
+    }
+  }
+
+  public synchronized void addVersion(long newVersion) {
+    if (objectVersions_.isEmpty() || newVersion < minVersion_) {
+      minVersion_ = newVersion;
+    }
+    objectVersions_.add(newVersion);
+  }
+
+  public synchronized long getMinimumVersion() {
+    return minVersion_;
+  }
+
+  public void addAll(List<? extends CatalogObject> catalogObjects) {
+    for (CatalogObject catalogObject: catalogObjects) {
+      addVersion(catalogObject.getCatalogVersion());
+    }
+  }
+
+  public void removeAll(List<? extends CatalogObject> catalogObjects) {
+    for (CatalogObject catalogObject: catalogObjects) {
+      removeVersion(catalogObject.getCatalogVersion());
+    }
+  }
+
+  public synchronized void clear() {
+    minVersion_ = Catalog.INITIAL_CATALOG_VERSION;
+    objectVersions_.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/b9d44aff/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index db05cb7..30c34ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -100,9 +100,9 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
     super();
     addDb(BuiltinsDb.getInstance());
     defaultKuduMasterHosts_ = defaultKuduMasterHosts;
-    // Ensure the contents of the CatalogObjectVersionQueue instance are cleared when a
+    // Ensure the contents of the CatalogObjectVersionSet instance are cleared when a
     // new instance of ImpaladCatalog is created (see IMPALA-6486).
-    CatalogObjectVersionQueue.INSTANCE.clear();
+    CatalogObjectVersionSet.INSTANCE.clear();
   }
 
   /**
@@ -211,7 +211,7 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
       catalogUpdateEventNotifier_.notifyAll();
     }
     return new TUpdateCatalogCacheResponse(catalogServiceId_,
-        CatalogObjectVersionQueue.INSTANCE.getMinimumVersion(), newCatalogVersion);
+        CatalogObjectVersionSet.INSTANCE.getMinimumVersion(), newCatalogVersion);
   }
 
 
@@ -362,13 +362,13 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
       newDb.setCatalogVersion(catalogVersion);
       addDb(newDb);
       if (existingDb != null) {
-        CatalogObjectVersionQueue.INSTANCE.updateVersions(
+        CatalogObjectVersionSet.INSTANCE.updateVersions(
             existingDb.getCatalogVersion(), catalogVersion);
-        CatalogObjectVersionQueue.INSTANCE.removeAll(existingDb.getTables());
-        CatalogObjectVersionQueue.INSTANCE.removeAll(
+        CatalogObjectVersionSet.INSTANCE.removeAll(existingDb.getTables());
+        CatalogObjectVersionSet.INSTANCE.removeAll(
             existingDb.getFunctions(null, new PatternMatcher()));
       } else {
-        CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+        CatalogObjectVersionSet.INSTANCE.addVersion(catalogVersion);
       }
     }
   }
@@ -405,10 +405,10 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
         existingFn.getCatalogVersion() < catalogVersion) {
       db.addFunction(function);
       if (existingFn != null) {
-        CatalogObjectVersionQueue.INSTANCE.updateVersions(
+        CatalogObjectVersionSet.INSTANCE.updateVersions(
             existingFn.getCatalogVersion(), catalogVersion);
       } else {
-        CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+        CatalogObjectVersionSet.INSTANCE.addVersion(catalogVersion);
       }
     }
   }
@@ -432,10 +432,10 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
     Db db = getDb(thriftDb.getDb_name());
     if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
       removeDb(db.getName());
-      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+      CatalogObjectVersionSet.INSTANCE.removeVersion(
           db.getCatalogVersion());
-      CatalogObjectVersionQueue.INSTANCE.removeAll(db.getTables());
-      CatalogObjectVersionQueue.INSTANCE.removeAll(
+      CatalogObjectVersionSet.INSTANCE.removeAll(db.getTables());
+      CatalogObjectVersionSet.INSTANCE.removeAll(
           db.getFunctions(null, new PatternMatcher()));
     }
   }
@@ -462,7 +462,7 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
     if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
       LibCacheRemoveEntry(fn.getLocation().getLocation());
       db.removeFunction(thriftFn.getSignature());
-      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+      CatalogObjectVersionSet.INSTANCE.removeVersion(
           fn.getCatalogVersion());
     }
   }
@@ -472,7 +472,7 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
     // version of the drop, remove the function.
     if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
       authPolicy_.removeRole(thriftRole.getRole_name());
-      CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
+      CatalogObjectVersionSet.INSTANCE.removeAll(existingRole.getPrivileges());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b9d44aff/fe/src/test/java/org/apache/impala/catalog/CatalogObjectVersionSetTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectVersionSetTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectVersionSetTest.java
new file mode 100644
index 0000000..3275317
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectVersionSetTest.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+/**
+ * Perform various operations on CatalogObjectVersionSet while applying the
+ * same operations to a simple O(n) list structure. The semantics should be
+ * the same.
+ */
+public class CatalogObjectVersionSetTest {
+  private CatalogObjectVersionSet set_ = new CatalogObjectVersionSet();
+  private List<Long> list_ = new ArrayList<>();
+
+  private void doAdd(long v) {
+    set_.addVersion(v);
+    list_.add(v);
+    checkConsistency();
+  }
+
+  private void doRemove(long v) {
+    set_.removeVersion(v);
+    list_.remove(v);
+    checkConsistency();
+  }
+
+  private void doUpdate(long from, long to) {
+    set_.updateVersions(from, to);
+    list_.remove(from);
+    list_.add(to);
+    checkConsistency();
+  }
+
+  private void checkConsistency() {
+    // We only need to check that the mininum element API returns the correct result
+    // matching the minimum in the built-in collection implementation. The
+    // CatalogObjectVersionSet doesn't expose other APIs like iteration, contains, etc.
+    if (list_.isEmpty()) {
+      assertEquals(0, set_.getMinimumVersion());
+    } else {
+      assertEquals((long)Collections.min(list_), set_.getMinimumVersion());
+    }
+  }
+
+  @Test
+  public void testAddRemove() {
+    assertEquals(0, set_.getMinimumVersion());
+    doAdd(10);
+    doAdd(20);
+    doAdd(5);
+    // Another entry matching the minimum.
+    doAdd(5);
+    // Removing it once should still leave a second copy of the element there.
+    doRemove(5);
+
+    // Removing a second time should yield the next minimum version.
+    doRemove(5);
+    doRemove(10);
+    doRemove(20);
+  }
+
+  @Test
+  public void testUpdate() {
+    doAdd(10);
+    doUpdate(10, 20);
+    doUpdate(20, 30);
+    doUpdate(30, 10);
+    doAdd(10);
+    doUpdate(10, 20);
+    doRemove(10);
+    doRemove(20);
+  }
+
+  @Test
+  public void testRemoveNonExistentVersion() {
+    // This currently does not throw.
+    doRemove(10);
+  }
+
+}


[2/2] impala git commit: Switch a couple of lists to deques in AnalyticEvalNode

Posted by ta...@apache.org.
Switch a couple of lists to deques in AnalyticEvalNode

I noticed this inefficiency while looking at this code.

std::deque generally offers better performance because it does fewer
memory allocations and has better memory locality. The main advantages
of std::list are O(1) insert/delete in the middle of the list and
stable iterators that remain valid through list modifications,
but neither property is useful for these result_tuples_ and
window_tuples_ because we only append at the back, remove from the front
and iterate over the whole list at once.

Change-Id: Iaa716dcf241a0a9e4f5221e5cf7a1596c75aecc0
Reviewed-on: http://gerrit.cloudera.org:8080/11159
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3c9fef2a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3c9fef2a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3c9fef2a

Branch: refs/heads/master
Commit: 3c9fef2aeebcf65c8b87f6e9abe744ab89198828
Parents: b9d44af
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Aug 7 11:58:07 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 8 22:03:07 2018 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc | 22 ++++++++++------------
 be/src/exec/analytic-eval-node.h  |  5 +++--
 2 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3c9fef2a/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index f51a5b5..68170a2 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -295,18 +295,16 @@ string AnalyticEvalNode::DebugStateString(bool detailed = false) const {
      << " last_result_idx=" << last_result_idx_;
   if (detailed) {
     ss << " result_tuples idx: [";
-    for (list<pair<int64_t, Tuple*>>::const_iterator it = result_tuples_.begin();
-        it != result_tuples_.end(); ++it) {
-      ss << it->first;
-      if (*it != result_tuples_.back()) ss << ", ";
+    for (const pair<int64_t, Tuple*>& result_tuple : result_tuples_) {
+      ss << result_tuple.first;
+      if (&result_tuple != &result_tuples_.back()) ss << ", ";
     }
     ss << "]";
     if (fn_scope_ == ROWS && window_.__isset.window_start) {
       ss << " window_tuples idx: [";
-      for (list<pair<int64_t, Tuple*>>::const_iterator it = window_tuples_.begin();
-          it != window_tuples_.end(); ++it) {
-        ss << it->first;
-        if (*it != window_tuples_.back()) ss << ", ";
+    for (const pair<int64_t, Tuple*>& window_tuple : window_tuples_) {
+        ss << window_tuple.first;
+        if (&window_tuple != &window_tuples_.back()) ss << ", ";
       }
       ss << "]";
     }
@@ -338,7 +336,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
       VLOG_ROW << id() << " Adding tuple to window at idx=" << stream_idx;
       Tuple* tuple = row->GetTuple(0)->DeepCopy(
           *child(0)->row_desc()->tuple_descriptors()[0], curr_tuple_pool_.get());
-      window_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, tuple));
+      window_tuples_.emplace_back(stream_idx, tuple);
     }
   }
 
@@ -388,7 +386,7 @@ Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   }
 
   DCHECK_GT(stream_idx, last_result_idx_);
-  result_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, result_tuple));
+  result_tuples_.emplace_back(stream_idx, result_tuple);
   last_result_idx_ = stream_idx;
   VLOG_ROW << id() << " Added result tuple, final state: " << DebugStateString(true);
   return Status::OK();
@@ -520,8 +518,8 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
       // prev_partition_last_result_tuple was the last result tuple in the partition, add
       // it back with the index of the last row in the partition so that all output rows
       // in this partition get the correct value.
-      result_tuples_.push_back(pair<int64_t, Tuple*>(curr_partition_idx_ - 1,
-          prev_partition_last_result_tuple));
+      result_tuples_.emplace_back(
+          curr_partition_idx_ - 1, prev_partition_last_result_tuple);
     }
     DCHECK(!result_tuples_.empty());
     last_result_idx_ = result_tuples_.back().first;

http://git-wip-us.apache.org/repos/asf/impala/blob/3c9fef2a/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index bf15ebf..696969a 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -18,6 +18,7 @@
 #ifndef IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 
+#include <deque>
 #include <memory>
 
 #include "exec/exec-node.h"
@@ -274,7 +275,7 @@ class AnalyticEvalNode : public ExecNode {
   /// output row and result_tuples_.size() may be one less than the row batch size, in
   /// which case we will process another input row batch (inserting one result tuple per
   /// input row) before returning a row batch.
-  std::list<std::pair<int64_t, Tuple*>> result_tuples_;
+  std::deque<std::pair<int64_t, Tuple*>> result_tuples_;
 
   /// Index in input_stream_ of the most recently added result tuple.
   int64_t last_result_idx_;
@@ -284,7 +285,7 @@ class AnalyticEvalNode : public ExecNode {
   /// or FOLLOWING. Tuples in this list are deep copied and owned by
   /// curr_window_tuple_pool_.
   /// TODO: Remove and use BufferedTupleStream (needs support for multiple readers).
-  std::list<std::pair<int64_t, Tuple*>> window_tuples_;
+  std::deque<std::pair<int64_t, Tuple*>> window_tuples_;
 
   /// The index of the last row from input_stream_ associated with output row containing
   /// resources in prev_tuple_pool_. -1 when the pool is empty. Resources from