You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/07/23 13:01:17 UTC
[hive] branch master updated: HIVE-23198: Add matching logic
between CacheTags and proactive eviction requests (Adam Szita,
reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 610a663 HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary)
610a663 is described below
commit 610a663e1cf36114339fb36388fcbefbda2a3b97
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Thu Jul 23 15:00:58 2020 +0200
HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary)
---
.../hive/llap/cache/TestCacheContentsTracker.java | 7 +-
.../hive/llap/cache/TestProactiveEviction.java | 96 ++++++++++++++++++++++
.../apache/hadoop/hive/llap/ProactiveEviction.java | 52 +++++++++++-
.../org/apache/hadoop/hive/common/io/CacheTag.java | 14 ++--
4 files changed, 157 insertions(+), 12 deletions(-)
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
index 175cbac..15d3f8f 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.llap.cache;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.hive.common.io.CacheTag;
@@ -124,7 +125,7 @@ public class TestCacheContentsTracker {
@Test
public void testEncodingDecoding() throws Exception {
- Map<String, String> partDescs = new HashMap<>();
+ LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
partDescs.put("pytha=goras", "a2+b2=c2");
CacheTag tag = CacheTag.build("math.rules", partDescs);
CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag);
@@ -166,9 +167,9 @@ public class TestCacheContentsTracker {
return llapCacheableBufferMock;
}
- private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
+ public static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
if (partitions != null && partitions.length > 0) {
- Map<String, String> partDescs = new HashMap<>();
+ LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
for (String partition : partitions) {
String[] partDesc = partition.split("=");
partDescs.put(partDesc[0], partDesc[1]);
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
new file mode 100644
index 0000000..689e27f
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.llap.ProactiveEviction;
+import org.apache.hadoop.hive.llap.ProactiveEviction.Request;
+import org.apache.hadoop.hive.llap.ProactiveEviction.Request.Builder;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker.cacheTagBuilder;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for proactive LLAP cache eviction.
+ */
+public class TestProactiveEviction {
+
+ private static final CacheTag[] TEST_TAGS = new CacheTag[] {
+ cacheTagBuilder("fx.rates", "from=USD", "to=HUF"),
+ cacheTagBuilder("fx.rates", "from=USD", "to=EUR"),
+ cacheTagBuilder("fx.rates", "from=USD", "to=EUR"),
+ cacheTagBuilder("fx.rates", "from=USD", "to=EUR"),
+ cacheTagBuilder("fx.rates", "from=EUR", "to=HUF"),
+ cacheTagBuilder("fx.futures", "ccy=EUR"),
+ cacheTagBuilder("fx.futures", "ccy=JPY"),
+ cacheTagBuilder("fx.futures", "ccy=JPY"),
+ cacheTagBuilder("fx.futures", "ccy=USD"),
+ cacheTagBuilder("fx.centralbanks"),
+ cacheTagBuilder("fx.centralbanks"),
+ cacheTagBuilder("fx.centralbanks"),
+ cacheTagBuilder("equity.prices", "ex=NYSE"),
+ cacheTagBuilder("equity.prices", "ex=NYSE"),
+ cacheTagBuilder("equity.prices", "ex=NASDAQ"),
+ cacheTagBuilder("fixedincome.bonds"),
+ cacheTagBuilder("fixedincome.bonds"),
+ cacheTagBuilder("fixedincome.yieldcurves")
+ };
+
+ @Test
+ public void testCachetagAndRequestMatching() throws Exception {
+ assertMatchOnTags(Builder.create().addDb("fx"), "111111111111000000");
+ assertMatchOnTags(Builder.create().addTable("fx", "futures"), "000001111000000000");
+ assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "futures", buildParts("ccy", "JPY")),
+ "000000110000000000");
+ assertMatchOnTags(Builder.create().addPartitionOfATable("equity", "prices", buildParts("ex", "NYSE"))
+ .addPartitionOfATable("equity", "prices", buildParts("ex", "NYSE")),"000000000000110000");
+ assertMatchOnTags(Builder.create().addTable("fx", "rates").addTable("fx", "futures"),
+ "111111111000000000");
+ assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", buildParts("from", "PLN")),
+ "000000000000000000");
+ assertMatchOnTags(Builder.create().addTable("fixedincome", "bonds"), "000000000000000110");
+ assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", buildParts("from", "EUR", "to", "HUF")),
+ "000010000000000000");
+ }
+
+ private static LinkedHashMap buildParts(String... vals) {
+ LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+ for (int i = 0; i < vals.length; i+=2) {
+ ret.put(vals[i], vals[i+1]);
+ }
+ return ret;
+ }
+
+ private static void assertMatchOnTags(Builder requestBuilder, String expected) {
+ assert expected.length() == TEST_TAGS.length;
+ // Marshal + unmarshal
+ Request request = Builder.create().fromProtoRequest(requestBuilder.build().toProtoRequests().get(0)).build();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < TEST_TAGS.length; ++i) {
+ sb.append(request.isTagMatch(TEST_TAGS[i]) ? '1' : '0');
+ }
+ assertEquals(expected, sb.toString());
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
index ba6d33e..522d6da 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -229,12 +230,59 @@ public final class ProactiveEviction {
}
/**
- * Match a CacheTag to this eviction request.
+ * Match a CacheTag to this eviction request. Must only be used on LLAP side only, where the received request may
+ * only contain one information for one DB.
+ *
* @param cacheTag
* @return true if cacheTag matches and the related buffer is eligible for proactive eviction, false otherwise.
*/
public boolean isTagMatch(CacheTag cacheTag) {
- // TODO: HIVE-23198
+ String db = getSingleDbName();
+ if (db == null) {
+ // Number of DBs in the request was not exactly 1.
+ throw new UnsupportedOperationException("Predicate only implemented for 1 DB case.");
+ }
+ TableName tagTableName = TableName.fromString(cacheTag.getTableName(), null, null);
+
+ // Check against DB.
+ if (!db.equals(tagTableName.getDb())) {
+ return false;
+ }
+
+ Map<String, Set<LinkedHashMap<String, String>>> tables = entities.get(db);
+
+ // If true, must be a drop DB event and this cacheTag matches.
+ if (tables.isEmpty()) {
+ return true;
+ }
+
+ Map<String, String> tagPartDescMap = null;
+ if (cacheTag instanceof CacheTag.PartitionCacheTag) {
+ tagPartDescMap = ((CacheTag.PartitionCacheTag) cacheTag).getPartitionDescMap();
+ }
+
+ // Check against table name.
+ for (String tableAndDbName : tables.keySet()) {
+ if (tableAndDbName.equals(tagTableName.getNotEmptyDbTable())) {
+
+ Set<LinkedHashMap<String, String>> partDescs = tables.get(tableAndDbName);
+
+ // If true, must be a drop table event, and this cacheTag matches.
+ if (partDescs == null) {
+ return true;
+ }
+
+ // Check against partition keys and values and alas for drop partition event.
+ if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) {
+ throw new IllegalArgumentException("CacheTag has no partition information, while trying" +
+ " to evict due to (and based on) a drop partition DDL statement..");
+ }
+
+ if (partDescs.contains(tagPartDescMap)) {
+ return true;
+ }
+ }
+ }
return false;
}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
index ba6e534..51fbd64 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.common.io;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -80,7 +80,7 @@ public abstract class CacheTag implements Comparable<CacheTag> {
return new TableCacheTag(tableName);
}
- public static final CacheTag build(String tableName, Map<String, String> partDescMap) {
+ public static final CacheTag build(String tableName, LinkedHashMap<String, String> partDescMap) {
if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) {
throw new IllegalArgumentException();
}
@@ -179,7 +179,7 @@ public abstract class CacheTag implements Comparable<CacheTag> {
* Returns a map of partition keys and values built from the information of this CacheTag.
* @return the map
*/
- public abstract Map<String, String> getPartitionDescMap();
+ public abstract LinkedHashMap<String, String> getPartitionDescMap();
}
@@ -204,8 +204,8 @@ public abstract class CacheTag implements Comparable<CacheTag> {
}
@Override
- public Map<String, String> getPartitionDescMap() {
- Map<String, String> result = new HashMap<>();
+ public LinkedHashMap<String, String> getPartitionDescMap() {
+ LinkedHashMap<String, String> result = new LinkedHashMap<>();
String[] partition = CacheTag.decodePartDesc(partitionDesc);
result.put(partition[0], partition[1]);
return result;
@@ -305,8 +305,8 @@ public abstract class CacheTag implements Comparable<CacheTag> {
}
@Override
- public Map<String, String> getPartitionDescMap() {
- Map<String, String> result = new HashMap<>();
+ public LinkedHashMap<String, String> getPartitionDescMap() {
+ LinkedHashMap<String, String> result = new LinkedHashMap<>();
for (String partDesc : partitionDesc) {
String[] partition = CacheTag.decodePartDesc(partDesc);
result.put(partition[0], partition[1]);