You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/07/23 13:01:17 UTC

[hive] branch master updated: HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 610a663  HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary)
610a663 is described below

commit 610a663e1cf36114339fb36388fcbefbda2a3b97
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Thu Jul 23 15:00:58 2020 +0200

    HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary)
---
 .../hive/llap/cache/TestCacheContentsTracker.java  |  7 +-
 .../hive/llap/cache/TestProactiveEviction.java     | 96 ++++++++++++++++++++++
 .../apache/hadoop/hive/llap/ProactiveEviction.java | 52 +++++++++++-
 .../org/apache/hadoop/hive/common/io/CacheTag.java | 14 ++--
 4 files changed, 157 insertions(+), 12 deletions(-)

diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
index 175cbac..15d3f8f 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.llap.cache;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.io.CacheTag;
@@ -124,7 +125,7 @@ public class TestCacheContentsTracker {
 
   @Test
   public void testEncodingDecoding() throws Exception {
-    Map<String, String> partDescs = new HashMap<>();
+    LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
     partDescs.put("pytha=goras", "a2+b2=c2");
     CacheTag tag = CacheTag.build("math.rules", partDescs);
     CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag);
@@ -166,9 +167,9 @@ public class TestCacheContentsTracker {
     return llapCacheableBufferMock;
   }
 
-  private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
+  public static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
     if (partitions != null && partitions.length > 0) {
-      Map<String, String> partDescs = new HashMap<>();
+      LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
       for (String partition : partitions) {
         String[] partDesc = partition.split("=");
         partDescs.put(partDesc[0], partDesc[1]);
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
new file mode 100644
index 0000000..689e27f
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.llap.ProactiveEviction;
+import org.apache.hadoop.hive.llap.ProactiveEviction.Request;
+import org.apache.hadoop.hive.llap.ProactiveEviction.Request.Builder;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker.cacheTagBuilder;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for proactive LLAP cache eviction.
+ */
+public class TestProactiveEviction {
+
+  private static final CacheTag[] TEST_TAGS = new CacheTag[] {
+    cacheTagBuilder("fx.rates", "from=USD", "to=HUF"),
+    cacheTagBuilder("fx.rates", "from=USD", "to=EUR"),
+    cacheTagBuilder("fx.rates", "from=USD", "to=EUR"),
+    cacheTagBuilder("fx.rates", "from=USD", "to=EUR"),
+    cacheTagBuilder("fx.rates", "from=EUR", "to=HUF"),
+    cacheTagBuilder("fx.futures", "ccy=EUR"),
+    cacheTagBuilder("fx.futures", "ccy=JPY"),
+    cacheTagBuilder("fx.futures", "ccy=JPY"),
+    cacheTagBuilder("fx.futures", "ccy=USD"),
+    cacheTagBuilder("fx.centralbanks"),
+    cacheTagBuilder("fx.centralbanks"),
+    cacheTagBuilder("fx.centralbanks"),
+    cacheTagBuilder("equity.prices", "ex=NYSE"),
+    cacheTagBuilder("equity.prices", "ex=NYSE"),
+    cacheTagBuilder("equity.prices", "ex=NASDAQ"),
+    cacheTagBuilder("fixedincome.bonds"),
+    cacheTagBuilder("fixedincome.bonds"),
+    cacheTagBuilder("fixedincome.yieldcurves")
+  };
+
+  @Test
+  public void testCachetagAndRequestMatching() throws Exception {
+    assertMatchOnTags(Builder.create().addDb("fx"), "111111111111000000");
+    assertMatchOnTags(Builder.create().addTable("fx", "futures"), "000001111000000000");
+    assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "futures", buildParts("ccy", "JPY")),
+    "000000110000000000");
+    assertMatchOnTags(Builder.create().addPartitionOfATable("equity", "prices", buildParts("ex", "NYSE"))
+        .addPartitionOfATable("equity", "prices", buildParts("ex", "NYSE")),"000000000000110000");
+    assertMatchOnTags(Builder.create().addTable("fx", "rates").addTable("fx", "futures"),
+        "111111111000000000");
+    assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", buildParts("from", "PLN")),
+        "000000000000000000");
+    assertMatchOnTags(Builder.create().addTable("fixedincome", "bonds"), "000000000000000110");
+    assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", buildParts("from", "EUR", "to", "HUF")),
+        "000010000000000000");
+  }
+
+  private static LinkedHashMap buildParts(String... vals) {
+    LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+    for (int i = 0; i < vals.length; i+=2) {
+      ret.put(vals[i], vals[i+1]);
+    }
+    return ret;
+  }
+
+  private static void assertMatchOnTags(Builder requestBuilder, String expected) {
+    assert expected.length() == TEST_TAGS.length;
+    // Marshal + unmarshal
+    Request request = Builder.create().fromProtoRequest(requestBuilder.build().toProtoRequests().get(0)).build();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < TEST_TAGS.length; ++i) {
+      sb.append(request.isTagMatch(TEST_TAGS[i]) ? '1' : '0');
+    }
+    assertEquals(expected, sb.toString());
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
index ba6d33e..522d6da 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import javax.net.SocketFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -229,12 +230,59 @@ public final class ProactiveEviction {
     }
 
     /**
-     * Match a CacheTag to this eviction request.
+     * Match a CacheTag to this eviction request. Must only be used on LLAP side only, where the received request may
+     * only contain one information for one DB.
+     *
      * @param cacheTag
      * @return true if cacheTag matches and the related buffer is eligible for proactive eviction, false otherwise.
      */
     public boolean isTagMatch(CacheTag cacheTag) {
-      // TODO: HIVE-23198
+      String db = getSingleDbName();
+      if (db == null) {
+        // Number of DBs in the request was not exactly 1.
+        throw new UnsupportedOperationException("Predicate only implemented for 1 DB case.");
+      }
+      TableName tagTableName = TableName.fromString(cacheTag.getTableName(), null, null);
+
+      // Check against DB.
+      if (!db.equals(tagTableName.getDb())) {
+        return false;
+      }
+
+      Map<String, Set<LinkedHashMap<String, String>>> tables = entities.get(db);
+
+      // If true, must be a drop DB event and this cacheTag matches.
+      if (tables.isEmpty()) {
+        return true;
+      }
+
+      Map<String, String> tagPartDescMap = null;
+      if (cacheTag instanceof CacheTag.PartitionCacheTag) {
+        tagPartDescMap = ((CacheTag.PartitionCacheTag) cacheTag).getPartitionDescMap();
+      }
+
+      // Check against table name.
+      for (String tableAndDbName : tables.keySet()) {
+        if (tableAndDbName.equals(tagTableName.getNotEmptyDbTable())) {
+
+          Set<LinkedHashMap<String, String>> partDescs = tables.get(tableAndDbName);
+
+          // If true, must be a drop table event, and this cacheTag matches.
+          if (partDescs == null) {
+            return true;
+          }
+
+          // Check against partition keys and values and alas for drop partition event.
+          if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) {
+            throw new IllegalArgumentException("CacheTag has no partition information, while trying" +
+                " to evict due to (and based on) a drop partition DDL statement..");
+          }
+
+          if (partDescs.contains(tagPartDescMap)) {
+            return true;
+          }
+        }
+      }
       return false;
     }
 
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
index ba6e534..51fbd64 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.common.io;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
@@ -80,7 +80,7 @@ public abstract class CacheTag implements Comparable<CacheTag> {
     return new TableCacheTag(tableName);
   }
 
-  public static final CacheTag build(String tableName, Map<String, String> partDescMap) {
+  public static final CacheTag build(String tableName, LinkedHashMap<String, String> partDescMap) {
     if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) {
       throw new IllegalArgumentException();
     }
@@ -179,7 +179,7 @@ public abstract class CacheTag implements Comparable<CacheTag> {
      * Returns a map of partition keys and values built from the information of this CacheTag.
      * @return the map
      */
-    public abstract Map<String, String> getPartitionDescMap();
+    public abstract LinkedHashMap<String, String> getPartitionDescMap();
 
   }
 
@@ -204,8 +204,8 @@ public abstract class CacheTag implements Comparable<CacheTag> {
     }
 
     @Override
-    public Map<String, String> getPartitionDescMap() {
-      Map<String, String> result = new HashMap<>();
+    public LinkedHashMap<String, String> getPartitionDescMap() {
+      LinkedHashMap<String, String> result = new LinkedHashMap<>();
       String[] partition = CacheTag.decodePartDesc(partitionDesc);
       result.put(partition[0], partition[1]);
       return result;
@@ -305,8 +305,8 @@ public abstract class CacheTag implements Comparable<CacheTag> {
     }
 
     @Override
-    public Map<String, String> getPartitionDescMap() {
-      Map<String, String> result = new HashMap<>();
+    public LinkedHashMap<String, String> getPartitionDescMap() {
+      LinkedHashMap<String, String> result = new LinkedHashMap<>();
       for (String partDesc : partitionDesc) {
         String[] partition = CacheTag.decodePartDesc(partDesc);
         result.put(partition[0], partition[1]);