You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/11/11 12:11:41 UTC
[hive] branch master updated: HIVE-22414: Make LLAP CacheTags more
memory efficient (Adam Szita, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3061a04 HIVE-22414: Make LLAP CacheTags more memory efficient (Adam Szita, reviewed by Peter Vary)
3061a04 is described below
commit 3061a04018261a042827bce16e04412743523fde
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Mon Oct 28 13:39:35 2019 +0100
HIVE-22414: Make LLAP CacheTags more memory efficient (Adam Szita, reviewed by Peter Vary)
---
.../hive/llap/io/encoded/OrcEncodedDataReader.java | 3 +-
.../hive/llap/cache/TestCacheContentsTracker.java | 34 +++++-
.../org/apache/hadoop/hive/common/io/CacheTag.java | 133 +++++++++++++++------
3 files changed, 127 insertions(+), 43 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 3fcf0dc..9e61322 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -283,8 +283,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
recordReaderTime(startTime);
return null;
}
- counters.setDesc(QueryFragmentCounters.Desc.TABLE,
- LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), false, parts));
+ counters.setDesc(QueryFragmentCounters.Desc.TABLE, cacheTag.getTableName());
counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
+ (fileKey == null ? "" : " (" + fileKey + ")"));
try {
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
index 03e66a8..175cbac 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hive.llap.cache;
-import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.junit.BeforeClass;
import org.junit.Test;
-import static java.util.stream.Collectors.toCollection;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -123,6 +122,27 @@ public class TestCacheContentsTracker {
}
+ @Test
+ public void testEncodingDecoding() throws Exception {
+ Map<String, String> partDescs = new HashMap<>();
+ partDescs.put("pytha=goras", "a2+b2=c2");
+ CacheTag tag = CacheTag.build("math.rules", partDescs);
+ CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag);
+ assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString());
+ assertEquals(1, stag.getPartitionDescMap().size());
+ assertEquals("a2+b2=c2", stag.getPartitionDescMap().get("pytha=goras"));
+
+ partDescs.clear();
+ partDescs.put("mutli=one", "one=/1");
+ partDescs.put("mutli=two/", "two=2");
+ tag = CacheTag.build("math.rules", partDescs);
+ CacheTag.MultiPartitionCacheTag mtag = ((CacheTag.MultiPartitionCacheTag)tag);
+ assertEquals("mutli=one=one=/1/mutli=two/=two=2", mtag.partitionDescToString());
+ assertEquals(2, mtag.getPartitionDescMap().size());
+ assertEquals("one=/1", mtag.getPartitionDescMap().get("mutli=one"));
+ assertEquals("two=2", mtag.getPartitionDescMap().get("mutli=two/"));
+ }
+
private static void compareViceVersa(int expected, CacheTag a, CacheTag b) {
if (a != null) {
assertEquals(expected, a.compareTo(b));
@@ -148,8 +168,12 @@ public class TestCacheContentsTracker {
private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
if (partitions != null && partitions.length > 0) {
- LinkedList<String> parts = Arrays.stream(partitions).collect(toCollection(LinkedList::new));
- return CacheTag.build(dbAndTable, parts);
+ Map<String, String> partDescs = new HashMap<>();
+ for (String partition : partitions) {
+ String[] partDesc = partition.split("=");
+ partDescs.put(partDesc[0], partDesc[1]);
+ }
+ return CacheTag.build(dbAndTable, partDescs);
} else {
return CacheTag.build(dbAndTable);
}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
index 47730cc..3062caa 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hive.common.io;
-import java.util.LinkedList;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
@@ -31,6 +34,9 @@ import org.apache.commons.lang.StringUtils;
* DB/table/1st_partition/.../nth_partition .
*/
public abstract class CacheTag implements Comparable<CacheTag> {
+
+ private static final String ENCODING = "UTF-8";
+
/**
* Prepended by DB name and '.' .
*/
@@ -79,33 +85,18 @@ public abstract class CacheTag implements Comparable<CacheTag> {
throw new IllegalArgumentException();
}
- LinkedList<String> partDescList = new LinkedList<>();
+ String[] partDescs = new String[partDescMap.size()];
+ int i = 0;
for (Map.Entry<String, String> entry : partDescMap.entrySet()) {
- StringBuilder sb = new StringBuilder();
- sb.append(entry.getKey()).append("=").append(entry.getValue());
- partDescList.add(sb.toString());
- }
-
- if (partDescList.size() == 1) {
- return new SinglePartitionCacheTag(tableName, partDescList.get(0));
- } else {
- // In this case it must be >1
- return new MultiPartitionCacheTag(tableName, partDescList);
- }
- }
-
- // Assumes elements of partDescList are already in p1=v1 format
- public static final CacheTag build(String tableName, LinkedList<String> partDescList) {
- if (StringUtils.isEmpty(tableName) || partDescList == null || partDescList.isEmpty()) {
- throw new IllegalArgumentException();
+ partDescs[i++] = encodePartDesc(entry.getKey(), entry.getValue());
}
- if (partDescList.size() == 1) {
- return new SinglePartitionCacheTag(tableName, partDescList.get(0));
+ if (partDescs.length == 1) {
+ return new SinglePartitionCacheTag(tableName, partDescs[0]);
} else {
// In this case it must be >1
- return new MultiPartitionCacheTag(tableName, partDescList);
+ return new MultiPartitionCacheTag(tableName, partDescs);
}
}
@@ -121,13 +112,15 @@ public abstract class CacheTag implements Comparable<CacheTag> {
if (tag instanceof MultiPartitionCacheTag) {
MultiPartitionCacheTag multiPartitionCacheTag = (MultiPartitionCacheTag) tag;
- if (multiPartitionCacheTag.partitionDesc.size() > 2) {
- LinkedList<String> subList = new LinkedList<>(multiPartitionCacheTag.partitionDesc);
- subList.removeLast();
+ if (multiPartitionCacheTag.partitionDesc.length > 2) {
+ String[] subList = new String[multiPartitionCacheTag.partitionDesc.length - 1];
+ for (int i = 0; i < subList.length; ++i) {
+ subList[i] = multiPartitionCacheTag.partitionDesc[i];
+ }
return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList);
} else {
return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName,
- multiPartitionCacheTag.partitionDesc.get(0));
+ multiPartitionCacheTag.partitionDesc[0]);
}
}
@@ -182,6 +175,12 @@ public abstract class CacheTag implements Comparable<CacheTag> {
*/
public abstract String partitionDescToString();
+ /**
+ * Returns a map of partition keys and values built from the information of this CacheTag.
+ * @return the map
+ */
+ public abstract Map<String, String> getPartitionDescMap();
+
}
/**
@@ -201,7 +200,15 @@ public abstract class CacheTag implements Comparable<CacheTag> {
@Override
public String partitionDescToString() {
- return this.partitionDesc;
+ return String.join("=", CacheTag.decodePartDesc(partitionDesc));
+ }
+
+ @Override
+ public Map<String, String> getPartitionDescMap() {
+ Map<String, String> result = new HashMap<>();
+ String[] partition = CacheTag.decodePartDesc(partitionDesc);
+ result.put(partition[0], partition[1]);
+ return result;
}
@Override
@@ -234,13 +241,15 @@ public abstract class CacheTag implements Comparable<CacheTag> {
*/
public static final class MultiPartitionCacheTag extends PartitionCacheTag {
- private final LinkedList<String> partitionDesc;
+ private final String[] partitionDesc;
- private MultiPartitionCacheTag(String tableName, LinkedList<String> partitionDesc) {
+ private MultiPartitionCacheTag(String tableName, String[] partitionDesc) {
super(tableName);
- this.partitionDesc = partitionDesc;
- if (this.partitionDesc != null && this.partitionDesc.size() > 1) {
- this.partitionDesc.stream().forEach(p -> p.intern());
+ if (partitionDesc != null && partitionDesc.length > 1) {
+ for (int i = 0; i < partitionDesc.length; ++i) {
+ partitionDesc[i] = partitionDesc[i].intern();
+ }
+ this.partitionDesc = partitionDesc;
} else {
throw new IllegalArgumentException();
}
@@ -259,12 +268,12 @@ public abstract class CacheTag implements Comparable<CacheTag> {
if (tableNameDiff != 0) {
return tableNameDiff;
} else {
- int sizeDiff = partitionDesc.size() - other.partitionDesc.size();
+ int sizeDiff = partitionDesc.length - other.partitionDesc.length;
if (sizeDiff != 0) {
return sizeDiff;
} else {
- for (int i = 0; i < partitionDesc.size(); ++i) {
- int partDiff = partitionDesc.get(i).compareTo(other.partitionDesc.get(i));
+ for (int i = 0; i < partitionDesc.length; ++i) {
+ int partDiff = partitionDesc[i].compareTo(other.partitionDesc[i]);
if (partDiff != 0) {
return partDiff;
}
@@ -285,9 +294,61 @@ public abstract class CacheTag implements Comparable<CacheTag> {
@Override
public String partitionDescToString() {
- return String.join("/", this.partitionDesc);
+ StringBuilder sb = new StringBuilder();
+ for (String partDesc : partitionDesc) {
+ String[] partition = CacheTag.decodePartDesc(partDesc);
+ sb.append(partition[0]).append('=').append(partition[1]);
+ sb.append('/');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ return sb.toString();
}
+ @Override
+ public Map<String, String> getPartitionDescMap() {
+ Map<String, String> result = new HashMap<>();
+ for (String partDesc : partitionDesc) {
+ String[] partition = CacheTag.decodePartDesc(partDesc);
+ result.put(partition[0], partition[1]);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Combines partition key and value Strings into one by encoding each and concating with '=' .
+ * @param partKey
+ * @param partVal
+ * @return
+ */
+ private static String encodePartDesc(String partKey, String partVal) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append(
+ URLEncoder.encode(partKey, ENCODING))
+ .append('=')
+ .append(URLEncoder.encode(partVal, ENCODING));
+ return sb.toString();
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Splits and decodes an a partition desc String encoded by encodePartDesc().
+ * @param partDesc
+ * @return
+ */
+ private static String[] decodePartDesc(String partDesc) {
+ try {
+ String[] encodedPartDesc = partDesc.split("=");
+ assert encodedPartDesc.length == 2;
+ return new String[] {
+ URLDecoder.decode(encodedPartDesc[0], ENCODING),
+ URLDecoder.decode(encodedPartDesc[1], ENCODING)};
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
}
}