You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/01/22 20:16:38 UTC

svn commit: r1654014 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: metadata/PartitionIterable.java parse/EximUtil.java parse/ExportSemanticAnalyzer.java

Author: gates
Date: Thu Jan 22 19:16:37 2015
New Revision: 1654014

URL: http://svn.apache.org/r1654014
Log:
HIVE-9359 Export of a large table causes OOM in Metastore and Client

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java?rev=1654014&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java Thu Jan 22 19:16:37 2015
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * PartitionIterable - effectively a lazy Iterable<Partition>
+ *
+ * Sometimes, we have a need for iterating through a list of partitions,
+ * but the list of partitions can be too big to fetch as a single object.
+ * Thus, the goal of PartitionIterable is to act as an Iterable<Partition>
+ * while lazily fetching each relevant partition, one after the other as
+ * independent metadata calls.
+ *
+ * It is very likely that any calls to PartitionIterable are going to result
+ * in a large number of calls, so use sparingly only when the memory cost
+ * of fetching all the partitions in one shot is too prohibitive.
+ *
+ * This is still pretty costly in that it would retain a list of partition
+ * names, but that should be far less expensive than the entire partition
+ * objects.
+ *
+ * Note that remove() is an illegal call on this, and will result in an
+ * IllegalStateException.
+ */
+public class PartitionIterable implements Iterable<Partition> {
+
+  @Override
+  public Iterator<Partition> iterator() {
+    return new Iterator<Partition>(){
+
+      private boolean initialized = false;
+      private Iterator<Partition> ptnsIterator = null;
+
+      private Iterator<String> partitionNamesIter = null;
+      private Iterator<Partition> batchIter = null;
+
+      private void initialize(){
+        if(!initialized){
+          if (currType == Type.LIST_PROVIDED){
+            ptnsIterator = ptnsProvided.iterator();
+          } else {
+            partitionNamesIter = partitionNames.iterator();
+          }
+          initialized = true;
+        }
+      }
+
+      public boolean hasNext() {
+        initialize();
+        if (currType == Type.LIST_PROVIDED){
+          return ptnsIterator.hasNext();
+        } else {
+          return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext();
+        }
+      }
+
+      @Override
+      public Partition next() {
+        initialize();
+        if (currType == Type.LIST_PROVIDED){
+          return ptnsIterator.next();
+        }
+
+        if ((batchIter == null) || !batchIter.hasNext()){
+          getNextBatch();
+        }
+
+        return batchIter.next();
+      }
+
+      private void getNextBatch() {
+        int batch_counter = 0;
+        List<String> nameBatch = new ArrayList<String>();
+        while (batch_counter < batch_size && partitionNamesIter.hasNext()){
+          nameBatch.add(partitionNamesIter.next());
+          batch_counter++;
+        }
+        try {
+          batchIter = db.getPartitionsByNames(table,nameBatch).iterator();
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void remove() {
+        throw new IllegalStateException(
+            "PartitionIterable is a read-only iterable and remove() is unsupported");
+      }
+    };
+  }
+
+  enum Type {
+    LIST_PROVIDED,  // Where a List<Partitions is already provided
+    LAZY_FETCH_PARTITIONS // Where we want to fetch Partitions lazily when they're needed.
+  };
+
+  final Type currType;
+
+  // used for LIST_PROVIDED cases
+  private List<Partition> ptnsProvided = null;
+
+  // used for LAZY_FETCH_PARTITIONS cases
+  private Hive db = null;
+  private Table table = null;
+  private Map<String, String> partialPartitionSpec = null;
+  private List<String> partitionNames = null;
+  private int batch_size;
+
+  /**
+   * Dummy constructor, which simply acts as an iterator on an already-present
+   * list of partitions, allows for easy drop-in replacement for other methods
+   * that already have a List<Partition>
+   */
+  public PartitionIterable(List<Partition> ptnsProvided){
+    this.currType = Type.LIST_PROVIDED;
+    this.ptnsProvided = ptnsProvided;
+  }
+
+  /**
+   * Primary constructor that fetches all partitions in a given table, given
+   * a Hive object and a table object, and a partial partition spec.
+   */
+  public PartitionIterable(Hive db, Table table, Map<String, String> partialPartitionSpec,
+      int batch_size) throws HiveException {
+    this.currType = Type.LAZY_FETCH_PARTITIONS;
+    this.db = db;
+    this.table = table;
+    this.partialPartitionSpec = partialPartitionSpec;
+    this.batch_size = batch_size;
+
+    if (this.partialPartitionSpec == null){
+      partitionNames = db.getPartitionNames(
+          table.getDbName(),table.getTableName(), (short) -1);
+    } else {
+      partitionNames = db.getPartitionNames(
+          table.getDbName(),table.getTableName(),partialPartitionSpec,(short)-1);
+    }
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java?rev=1654014&r1=1654013&r2=1654014&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java Thu Jan 22 19:16:37 2015
@@ -47,6 +47,8 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TJSONProtocol;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -168,37 +170,37 @@ public class EximUtil {
   public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null;
 
   public static void createExportDump(FileSystem fs, Path metadataPath, org.apache.hadoop.hive.ql.metadata.Table tableHandle,
-      List<org.apache.hadoop.hive.ql.metadata.Partition> partitions) throws SemanticException, IOException {
+      Iterable<org.apache.hadoop.hive.ql.metadata.Partition> partitions) throws SemanticException, IOException {
+    OutputStream out = fs.create(metadataPath);
+    JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out);
+    jgen.writeStartObject();
+    jgen.writeStringField("version",METADATA_FORMAT_VERSION);
+    if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
+      jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+    }
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
     try {
-      JSONObject jsonContainer = new JSONObject();
-      jsonContainer.put("version", METADATA_FORMAT_VERSION);
-      if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
-        jsonContainer.put("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
-      }
-      TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-      try {
-        String tableDesc = serializer.toString(tableHandle.getTTable(), "UTF-8");
-        jsonContainer.put("table", tableDesc);
-        JSONArray jsonPartitions = new JSONArray();
-        if (partitions != null) {
-          for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
-            String partDesc = serializer.toString(partition.getTPartition(), "UTF-8");
-            jsonPartitions.put(partDesc);
-          }
+      jgen.writeStringField("table", serializer.toString(tableHandle.getTTable(), "UTF-8"));
+      jgen.writeFieldName("partitions");
+      jgen.writeStartArray();
+      if (partitions != null) {
+        for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
+          jgen.writeString(serializer.toString(partition.getTPartition(), "UTF-8"));
+          jgen.flush();
         }
-        jsonContainer.put("partitions", jsonPartitions);
-      } catch (TException e) {
-        throw new SemanticException(
-            ErrorMsg.GENERIC_ERROR
-                .getMsg("Exception while serializing the metastore objects"), e);
       }
-      OutputStream out = fs.create(metadataPath);
-      out.write(jsonContainer.toString().getBytes("UTF-8"));
-      out.close();
-
-    } catch (JSONException e) {
-      throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg("Error in serializing metadata"), e);
+      jgen.writeEndArray();
+    } catch (TException e) {
+      throw new SemanticException(
+          ErrorMsg.GENERIC_ERROR
+              .getMsg("Exception while serializing the metastore objects"), e);
     }
+    jgen.writeEndObject();
+    jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close.
+  }
+
+  private static void write(OutputStream out, String s) throws IOException {
+    out.write(s.getBytes("UTF-8"));
   }
 
   public static Map.Entry<Table, List<Partition>>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java?rev=1654014&r1=1654013&r2=1654014&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java Thu Jan 22 19:16:37 2015
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
@@ -81,11 +83,13 @@ public class ExportSemanticAnalyzer exte
       throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
     }
 
-    List<Partition> partitions = null;
+    PartitionIterable partitions = null;
     try {
-      partitions = null;
       if (ts.tableHandle.isPartitioned()) {
-        partitions = (ts.partitions != null) ? ts.partitions : db.getPartitions(ts.tableHandle);
+        partitions = (ts.partitions != null) ?
+            new PartitionIterable(ts.partitions) :
+            new PartitionIterable(db,ts.tableHandle,null,conf.getIntVar(
+                HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
       }
       Path path = new Path(ctx.getLocalTmpPath(), "_metadata");
       EximUtil.createExportDump(FileSystem.getLocal(conf), path, ts.tableHandle, partitions);