You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/11/11 12:11:41 UTC

[hive] branch master updated: HIVE-22414: Make LLAP CacheTags more memory efficient (Adam Szita, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3061a04  HIVE-22414: Make LLAP CacheTags more memory efficient (Adam Szita, reviewed by Peter Vary)
3061a04 is described below

commit 3061a04018261a042827bce16e04412743523fde
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Mon Oct 28 13:39:35 2019 +0100

    HIVE-22414: Make LLAP CacheTags more memory efficient (Adam Szita, reviewed by Peter Vary)
---
 .../hive/llap/io/encoded/OrcEncodedDataReader.java |   3 +-
 .../hive/llap/cache/TestCacheContentsTracker.java  |  34 +++++-
 .../org/apache/hadoop/hive/common/io/CacheTag.java | 133 +++++++++++++++------
 3 files changed, 127 insertions(+), 43 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 3fcf0dc..9e61322 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -283,8 +283,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       recordReaderTime(startTime);
       return null;
     }
-    counters.setDesc(QueryFragmentCounters.Desc.TABLE,
-        LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), false, parts));
+    counters.setDesc(QueryFragmentCounters.Desc.TABLE, cacheTag.getTableName());
     counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
         + (fileKey == null ? "" : " (" + fileKey + ")"));
     try {
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
index 03e66a8..175cbac 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -17,15 +17,14 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
-import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.hive.common.io.CacheTag;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static java.util.stream.Collectors.toCollection;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -123,6 +122,27 @@ public class TestCacheContentsTracker {
 
   }
 
+  @Test
+  public void testEncodingDecoding() throws Exception {
+    Map<String, String> partDescs = new HashMap<>();
+    partDescs.put("pytha=goras", "a2+b2=c2");
+    CacheTag tag = CacheTag.build("math.rules", partDescs);
+    CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag);
+    assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString());
+    assertEquals(1, stag.getPartitionDescMap().size());
+    assertEquals("a2+b2=c2", stag.getPartitionDescMap().get("pytha=goras"));
+
+    partDescs.clear();
+    partDescs.put("mutli=one", "one=/1");
+    partDescs.put("mutli=two/", "two=2");
+    tag = CacheTag.build("math.rules", partDescs);
+    CacheTag.MultiPartitionCacheTag mtag = ((CacheTag.MultiPartitionCacheTag)tag);
+    assertEquals("mutli=one=one=/1/mutli=two/=two=2", mtag.partitionDescToString());
+    assertEquals(2, mtag.getPartitionDescMap().size());
+    assertEquals("one=/1", mtag.getPartitionDescMap().get("mutli=one"));
+    assertEquals("two=2", mtag.getPartitionDescMap().get("mutli=two/"));
+  }
+
   private static void compareViceVersa(int expected, CacheTag a, CacheTag b) {
     if (a != null) {
       assertEquals(expected, a.compareTo(b));
@@ -148,8 +168,12 @@ public class TestCacheContentsTracker {
 
   private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
     if (partitions != null && partitions.length > 0) {
-      LinkedList<String> parts = Arrays.stream(partitions).collect(toCollection(LinkedList::new));
-      return CacheTag.build(dbAndTable, parts);
+      Map<String, String> partDescs = new HashMap<>();
+      for (String partition : partitions) {
+        String[] partDesc = partition.split("=");
+        partDescs.put(partDesc[0], partDesc[1]);
+      }
+      return CacheTag.build(dbAndTable, partDescs);
     } else {
       return CacheTag.build(dbAndTable);
     }
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
index 47730cc..3062caa 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hive.common.io;
 
-import java.util.LinkedList;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
@@ -31,6 +34,9 @@ import org.apache.commons.lang.StringUtils;
  *     DB/table/1st_partition/.../nth_partition .
  */
 public abstract class CacheTag implements Comparable<CacheTag> {
+
+  private static final String ENCODING = "UTF-8";
+
   /**
    * Prepended by DB name and '.' .
    */
@@ -79,33 +85,18 @@ public abstract class CacheTag implements Comparable<CacheTag> {
       throw new IllegalArgumentException();
     }
 
-    LinkedList<String> partDescList = new LinkedList<>();
+    String[] partDescs = new String[partDescMap.size()];
+    int i = 0;
 
     for (Map.Entry<String, String> entry : partDescMap.entrySet()) {
-      StringBuilder sb = new StringBuilder();
-      sb.append(entry.getKey()).append("=").append(entry.getValue());
-      partDescList.add(sb.toString());
-    }
-
-    if (partDescList.size() == 1) {
-      return new SinglePartitionCacheTag(tableName, partDescList.get(0));
-    } else {
-      // In this case it must be >1
-      return new MultiPartitionCacheTag(tableName, partDescList);
-    }
-  }
-
-  // Assumes elements of partDescList are already in p1=v1 format
-  public static final CacheTag build(String tableName, LinkedList<String> partDescList) {
-    if (StringUtils.isEmpty(tableName) || partDescList == null || partDescList.isEmpty()) {
-      throw new IllegalArgumentException();
+      partDescs[i++] = encodePartDesc(entry.getKey(), entry.getValue());
     }
 
-    if (partDescList.size() == 1) {
-      return new SinglePartitionCacheTag(tableName, partDescList.get(0));
+    if (partDescs.length == 1) {
+      return new SinglePartitionCacheTag(tableName, partDescs[0]);
     } else {
       // In this case it must be >1
-      return new MultiPartitionCacheTag(tableName, partDescList);
+      return new MultiPartitionCacheTag(tableName, partDescs);
     }
   }
 
@@ -121,13 +112,15 @@ public abstract class CacheTag implements Comparable<CacheTag> {
 
     if (tag instanceof MultiPartitionCacheTag) {
       MultiPartitionCacheTag multiPartitionCacheTag = (MultiPartitionCacheTag) tag;
-      if (multiPartitionCacheTag.partitionDesc.size() > 2) {
-        LinkedList<String> subList = new LinkedList<>(multiPartitionCacheTag.partitionDesc);
-        subList.removeLast();
+      if (multiPartitionCacheTag.partitionDesc.length > 2) {
+        String[] subList = new String[multiPartitionCacheTag.partitionDesc.length - 1];
+        for (int i = 0; i < subList.length; ++i) {
+          subList[i] = multiPartitionCacheTag.partitionDesc[i];
+        }
         return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList);
       } else {
         return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName,
-            multiPartitionCacheTag.partitionDesc.get(0));
+            multiPartitionCacheTag.partitionDesc[0]);
       }
     }
 
@@ -182,6 +175,12 @@ public abstract class CacheTag implements Comparable<CacheTag> {
      */
     public abstract String partitionDescToString();
 
+    /**
+     * Returns a map of partition keys and values built from the information of this CacheTag.
+     * @return the map
+     */
+    public abstract Map<String, String> getPartitionDescMap();
+
   }
 
   /**
@@ -201,7 +200,15 @@ public abstract class CacheTag implements Comparable<CacheTag> {
 
     @Override
     public String partitionDescToString() {
-      return this.partitionDesc;
+      return String.join("=", CacheTag.decodePartDesc(partitionDesc));
+    }
+
+    @Override
+    public Map<String, String> getPartitionDescMap() {
+      Map<String, String> result = new HashMap<>();
+      String[] partition = CacheTag.decodePartDesc(partitionDesc);
+      result.put(partition[0], partition[1]);
+      return result;
     }
 
     @Override
@@ -234,13 +241,15 @@ public abstract class CacheTag implements Comparable<CacheTag> {
    */
   public static final class MultiPartitionCacheTag extends PartitionCacheTag {
 
-    private final LinkedList<String> partitionDesc;
+    private final String[] partitionDesc;
 
-    private MultiPartitionCacheTag(String tableName, LinkedList<String> partitionDesc) {
+    private MultiPartitionCacheTag(String tableName, String[] partitionDesc) {
       super(tableName);
-      this.partitionDesc = partitionDesc;
-      if (this.partitionDesc != null && this.partitionDesc.size() > 1) {
-        this.partitionDesc.stream().forEach(p -> p.intern());
+      if (partitionDesc != null && partitionDesc.length > 1) {
+        for (int i = 0; i < partitionDesc.length; ++i) {
+          partitionDesc[i] = partitionDesc[i].intern();
+        }
+        this.partitionDesc = partitionDesc;
       } else {
         throw new IllegalArgumentException();
       }
@@ -259,12 +268,12 @@ public abstract class CacheTag implements Comparable<CacheTag> {
       if (tableNameDiff != 0) {
         return tableNameDiff;
       } else {
-        int sizeDiff = partitionDesc.size() - other.partitionDesc.size();
+        int sizeDiff = partitionDesc.length - other.partitionDesc.length;
         if (sizeDiff != 0) {
           return sizeDiff;
         } else {
-          for (int i = 0; i < partitionDesc.size(); ++i) {
-            int partDiff = partitionDesc.get(i).compareTo(other.partitionDesc.get(i));
+          for (int i = 0; i < partitionDesc.length; ++i) {
+            int partDiff = partitionDesc[i].compareTo(other.partitionDesc[i]);
             if (partDiff != 0) {
               return partDiff;
             }
@@ -285,9 +294,61 @@ public abstract class CacheTag implements Comparable<CacheTag> {
 
     @Override
     public String partitionDescToString() {
-      return String.join("/", this.partitionDesc);
+      StringBuilder sb = new StringBuilder();
+      for (String partDesc : partitionDesc) {
+        String[] partition = CacheTag.decodePartDesc(partDesc);
+        sb.append(partition[0]).append('=').append(partition[1]);
+        sb.append('/');
+      }
+      sb.deleteCharAt(sb.length() - 1);
+      return sb.toString();
     }
 
+    @Override
+    public Map<String, String> getPartitionDescMap() {
+      Map<String, String> result = new HashMap<>();
+      for (String partDesc : partitionDesc) {
+        String[] partition = CacheTag.decodePartDesc(partDesc);
+        result.put(partition[0], partition[1]);
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Combines partition key and value Strings into one by encoding each and concating with '=' .
+   * @param partKey
+   * @param partVal
+   * @return
+   */
+  private static String encodePartDesc(String partKey, String partVal) {
+    try {
+      StringBuilder sb = new StringBuilder();
+      sb.append(
+          URLEncoder.encode(partKey, ENCODING))
+          .append('=')
+          .append(URLEncoder.encode(partVal, ENCODING));
+      return sb.toString();
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Splits and decodes an a partition desc String encoded by encodePartDesc().
+   * @param partDesc
+   * @return
+   */
+  private static String[] decodePartDesc(String partDesc) {
+    try {
+      String[] encodedPartDesc = partDesc.split("=");
+      assert encodedPartDesc.length == 2;
+      return new String[] {
+          URLDecoder.decode(encodedPartDesc[0], ENCODING),
+          URLDecoder.decode(encodedPartDesc[1], ENCODING)};
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
 }