You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/21 22:41:16 UTC

svn commit: r1633455 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/test/queries/clientposi...

Author: gunther
Date: Tue Oct 21 20:41:16 2014
New Revision: 1633455

URL: http://svn.apache.org/r1633455
Log:
HIVE-8341: Transaction information in config file can grow excessively large (Alan Gates via Gunther Hagleitner)

Added:
    hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java
    hive/trunk/ql/src/test/queries/clientpositive/transform_acid.q
    hive/trunk/ql/src/test/results/clientpositive/transform_acid.q.out
    hive/trunk/ql/src/test/scripts/transform_acid_grep.sh
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java?rev=1633455&r1=1633454&r2=1633455&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnListImpl.java Tue Oct 21 20:41:16 2014
@@ -18,10 +18,23 @@
 
 package org.apache.hadoop.hive.common;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 public class ValidTxnListImpl implements ValidTxnList {
 
+  static final private Log LOG = LogFactory.getLog(ValidTxnListImpl.class.getName());
+  final static private int MAX_UNCOMPRESSED_LENGTH = 256;
+  final static private char COMPRESSION_MARKER = 'C';
+  final static private String STRING_ENCODING = "ISO-8859-1";
+
   private long[] exceptions;
   private long highWatermark;
 
@@ -95,7 +108,25 @@ public class ValidTxnListImpl implements
         buf.append(except);
       }
     }
-    return buf.toString();
+    if (buf.length() > MAX_UNCOMPRESSED_LENGTH) {
+      try {
+        ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+        GZIPOutputStream gzip = new GZIPOutputStream(byteBuf);
+        gzip.write(buf.toString().getBytes());
+        gzip.close();
+        StringBuilder buf2 = new StringBuilder();
+        buf2.append(COMPRESSION_MARKER);
+        buf2.append(buf.length());
+        buf2.append(':');
+        buf2.append(byteBuf.toString(STRING_ENCODING));
+        return buf2.toString();
+      } catch (IOException e) {
+        LOG.error("Unable to compress transaction list, " + e.getMessage());
+        throw new RuntimeException(e);
+      }
+    } else {
+      return buf.toString();
+    }
   }
 
   @Override
@@ -104,11 +135,36 @@ public class ValidTxnListImpl implements
       highWatermark = Long.MAX_VALUE;
       exceptions = new long[0];
     } else {
-      String[] values = src.split(":");
+      String[] values;
+      if (src.charAt(0) == COMPRESSION_MARKER) {
+        try {
+          int colon = src.indexOf(':');
+          int len = Integer.valueOf(src.substring(1, colon));
+          ByteArrayInputStream byteBuf =
+              new ByteArrayInputStream(src.substring(colon + 1).getBytes(STRING_ENCODING));
+          GZIPInputStream gzip = new GZIPInputStream(byteBuf);
+          byte[] buf = new byte[len];
+          int bytesRead = 0;
+          int offset = 0;
+          int maxReadLen = len;
+          do {
+            bytesRead = gzip.read(buf, offset, maxReadLen);
+            offset += bytesRead;
+            maxReadLen -= bytesRead;
+          } while (maxReadLen > 0);
+          values = new String(buf).split(":");
+        } catch (IOException e) {
+          LOG.error("Unable to decode compressed transaction list, " + e.getMessage());
+          throw new RuntimeException(e);
+        }
+
+      } else {
+        values = src.split(":");
+      }
       highWatermark = Long.parseLong(values[0]);
       exceptions = new long[values.length - 1];
-      for(int i = 1; i < values.length; ++i) {
-        exceptions[i-1] = Long.parseLong(values[i]);
+      for (int i = 1; i < values.length; ++i) {
+        exceptions[i - 1] = Long.parseLong(values[i]);
       }
     }
   }

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1633455&r1=1633454&r2=1633455&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Oct 21 20:41:16 2014
@@ -608,6 +608,10 @@ public class HiveConf extends Configurat
         "transform function (the custom mapper/reducer that the user has specified in the query)"),
     HIVESCRIPTTRUNCATEENV("hive.script.operator.truncate.env", false,
         "Truncate each environment variable for external script in scripts operator to 20KB (to fit system limits)"),
+    HIVESCRIPT_ENV_BLACKLIST("hive.script.operator.env.blacklist",
+        "hive.txn.valid.txns,hive.script.operator.env.blacklist",
+        "Comma separated list of keys from the configuration file not to convert to environment " +
+        "variables when envoking the script operator"),
     HIVEMAPREDMODE("hive.mapred.mode", "nonstrict",
         "The mode in which the Hive operations are being performed. \n" +
         "In strict mode, some risky queries are not allowed to run. They include:\n" +

Added: hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java?rev=1633455&view=auto
==============================================================================
--- hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java (added)
+++ hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidTxnImpl.java Tue Oct 21 20:41:16 2014
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+/**
+ * Tests for {@link org.apache.hadoop.hive.common.ValidTxnListImpl}
+ */
+public class TestValidTxnImpl {
+
+  @Test
+  public void noExceptions() throws Exception {
+    ValidTxnList txnList = new ValidTxnListImpl(new long[0], 1);
+    String str = txnList.writeToString();
+    Assert.assertEquals("1:", str);
+    ValidTxnList newList = new ValidTxnListImpl();
+    newList.readFromString(str);
+    Assert.assertTrue(newList.isTxnCommitted(1));
+    Assert.assertFalse(newList.isTxnCommitted(2));
+  }
+
+  @Test
+  public void exceptions() throws Exception {
+    ValidTxnList txnList = new ValidTxnListImpl(new long[]{2L,4L}, 5);
+    String str = txnList.writeToString();
+    Assert.assertEquals("5:2:4", str);
+    ValidTxnList newList = new ValidTxnListImpl();
+    newList.readFromString(str);
+    Assert.assertTrue(newList.isTxnCommitted(1));
+    Assert.assertFalse(newList.isTxnCommitted(2));
+    Assert.assertTrue(newList.isTxnCommitted(3));
+    Assert.assertFalse(newList.isTxnCommitted(4));
+    Assert.assertTrue(newList.isTxnCommitted(5));
+    Assert.assertFalse(newList.isTxnCommitted(6));
+  }
+
+  @Test
+  public void longEnoughToCompress() throws Exception {
+    long[] exceptions = new long[1000];
+    for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
+    ValidTxnList txnList = new ValidTxnListImpl(exceptions, 2000);
+    String str = txnList.writeToString();
+    Assert.assertEquals('C', str.charAt(0));
+    ValidTxnList newList = new ValidTxnListImpl();
+    newList.readFromString(str);
+    for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnCommitted(i));
+    for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnCommitted(i));
+    for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnCommitted(i));
+    Assert.assertFalse(newList.isTxnCommitted(2001));
+  }
+
+  @Test
+  public void readWriteConfig() throws Exception {
+    long[] exceptions = new long[1000];
+    for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
+    ValidTxnList txnList = new ValidTxnListImpl(exceptions, 2000);
+    String str = txnList.writeToString();
+    Assert.assertEquals('C', str.charAt(0));
+    Configuration conf = new Configuration();
+    conf.set(ValidTxnList.VALID_TXNS_KEY, str);
+    File tmpFile = File.createTempFile("TestValidTxnImpl", "readWriteConfig");
+    DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpFile));
+    conf.write(out);
+    out.close();
+    DataInputStream in = new DataInputStream(new FileInputStream(tmpFile));
+    Configuration newConf = new Configuration();
+    newConf.readFields(in);
+    Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY));
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1633455&r1=1633454&r2=1633455&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Tue Oct 21 20:41:16 2014
@@ -27,8 +27,10 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
@@ -82,6 +84,8 @@ public class ScriptOperator extends Oper
   transient Deserializer scriptOutputDeserializer;
   transient volatile Throwable scriptError = null;
   transient RecordWriter scriptOutWriter = null;
+  // List of conf entries not to turn into env vars
+  transient Set<String> blackListedConfEntries = null;
 
   static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe";
   static final String IO_EXCEPTION_STREAM_CLOSED = "Stream closed";
@@ -119,7 +123,8 @@ public class ScriptOperator extends Oper
 
   /**
    * Most UNIX implementations impose some limit on the total size of environment variables and
-   * size of strings. To fit in this limit we need sometimes to truncate strings.
+   * size of strings. To fit in this limit we need sometimes to truncate strings.  Also,
+   * some values tend be long and are meaningless to scripts, so strain them out.
    * @param value environment variable value to check
    * @param name name of variable (used only for logging purposes)
    * @param truncate truncate value or not
@@ -138,6 +143,23 @@ public class ScriptOperator extends Oper
     return value;
   }
 
+  boolean blackListed(String name) {
+    if (blackListedConfEntries == null) {
+      blackListedConfEntries = new HashSet<String>();
+      if (hconf != null) {
+        String bl = hconf.get(HiveConf.ConfVars.HIVESCRIPT_ENV_BLACKLIST.toString());
+        if (bl != null && bl.length() > 0) {
+          String[] bls = bl.split(",");
+          for (String b : bls) {
+            b.replaceAll(".", "_");
+            blackListedConfEntries.add(b);
+          }
+        }
+      }
+    }
+    return blackListedConfEntries.contains(name);
+  }
+
   /**
    * addJobConfToEnvironment is mostly shamelessly copied from hadoop streaming. Added additional
    * check on environment variable length
@@ -147,13 +169,16 @@ public class ScriptOperator extends Oper
     while (it.hasNext()) {
       Map.Entry<String, String> en = it.next();
       String name = en.getKey();
-      // String value = (String)en.getValue(); // does not apply variable
-      // expansion
-      String value = conf.get(name); // does variable expansion
-      name = safeEnvVarName(name);
-      boolean truncate = conf.getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false);
-      value = safeEnvVarValue(value, name, truncate);
-      env.put(name, value);
+      if (!blackListed(name)) {
+        // String value = (String)en.getValue(); // does not apply variable
+        // expansion
+        String value = conf.get(name); // does variable expansion
+        name = safeEnvVarName(name);
+        boolean truncate = conf
+            .getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false);
+        value = safeEnvVarValue(value, name, truncate);
+        env.put(name, value);
+      }
     }
   }
 

Added: hive/trunk/ql/src/test/queries/clientpositive/transform_acid.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/transform_acid.q?rev=1633455&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/transform_acid.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/transform_acid.q Tue Oct 21 20:41:16 2014
@@ -0,0 +1,13 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.enforce.bucketing=true;
+
+-- EXCLUDE_OS_WINDOWS
+
+create table transform_acid(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');
+insert into table transform_acid select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 1;
+
+
+ADD FILE ../../ql/src/test/scripts/transform_acid_grep.sh;
+
+SELECT transform(*) USING 'transform_acid_grep.sh' AS (col string) FROM transform_acid;

Added: hive/trunk/ql/src/test/results/clientpositive/transform_acid.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/transform_acid.q.out?rev=1633455&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/transform_acid.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/transform_acid.q.out Tue Oct 21 20:41:16 2014
@@ -0,0 +1,31 @@
+PREHOOK: query: -- EXCLUDE_OS_WINDOWS
+
+create table transform_acid(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@transform_acid
+POSTHOOK: query: -- EXCLUDE_OS_WINDOWS
+
+create table transform_acid(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@transform_acid
+PREHOOK: query: insert into table transform_acid select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@transform_acid
+POSTHOOK: query: insert into table transform_acid select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@transform_acid
+POSTHOOK: Lineage: transform_acid.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: transform_acid.b EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+PREHOOK: query: SELECT transform(*) USING 'transform_acid_grep.sh' AS (col string) FROM transform_acid
+PREHOOK: type: QUERY
+PREHOOK: Input: default@transform_acid
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT transform(*) USING 'transform_acid_grep.sh' AS (col string) FROM transform_acid
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@transform_acid
+#### A masked pattern was here ####
+a

Added: hive/trunk/ql/src/test/scripts/transform_acid_grep.sh
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/scripts/transform_acid_grep.sh?rev=1633455&view=auto
==============================================================================
--- hive/trunk/ql/src/test/scripts/transform_acid_grep.sh (added)
+++ hive/trunk/ql/src/test/scripts/transform_acid_grep.sh Tue Oct 21 20:41:16 2014
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+env | grep hive.txn.valid.txns
+echo a
+exit 0
+