You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/01/22 20:16:38 UTC
svn commit: r1654014 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
metadata/PartitionIterable.java parse/EximUtil.java
parse/ExportSemanticAnalyzer.java
Author: gates
Date: Thu Jan 22 19:16:37 2015
New Revision: 1654014
URL: http://svn.apache.org/r1654014
Log:
HIVE-9359 Export of a large table causes OOM in Metastore and Client
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java?rev=1654014&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java Thu Jan 22 19:16:37 2015
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * PartitionIterable - effectively a lazy Iterable<Partition>
+ *
+ * Sometimes, we have a need for iterating through a list of partitions,
+ * but the list of partitions can be too big to fetch as a single object.
+ * Thus, the goal of PartitionIterable is to act as an Iterable<Partition>
+ * while lazily fetching each relevant partition, one after the other as
+ * independent metadata calls.
+ *
+ * It is very likely that any calls to PartitionIterable are going to result
+ * in a large number of calls, so use sparingly only when the memory cost
+ * of fetching all the partitions in one shot is too prohibitive.
+ *
+ * This is still pretty costly in that it would retain a list of partition
+ * names, but that should be far less expensive than the entire partition
+ * objects.
+ *
+ * Note that remove() is an illegal call on this, and will result in an
+ * IllegalStateException.
+ */
+public class PartitionIterable implements Iterable<Partition> {
+
+ @Override
+ public Iterator<Partition> iterator() {
+ return new Iterator<Partition>(){
+
+ private boolean initialized = false;
+ private Iterator<Partition> ptnsIterator = null;
+
+ private Iterator<String> partitionNamesIter = null;
+ private Iterator<Partition> batchIter = null;
+
+ private void initialize(){
+ if(!initialized){
+ if (currType == Type.LIST_PROVIDED){
+ ptnsIterator = ptnsProvided.iterator();
+ } else {
+ partitionNamesIter = partitionNames.iterator();
+ }
+ initialized = true;
+ }
+ }
+
+ public boolean hasNext() {
+ initialize();
+ if (currType == Type.LIST_PROVIDED){
+ return ptnsIterator.hasNext();
+ } else {
+ return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext();
+ }
+ }
+
+ @Override
+ public Partition next() {
+ initialize();
+ if (currType == Type.LIST_PROVIDED){
+ return ptnsIterator.next();
+ }
+
+ if ((batchIter == null) || !batchIter.hasNext()){
+ getNextBatch();
+ }
+
+ return batchIter.next();
+ }
+
+ private void getNextBatch() {
+ int batch_counter = 0;
+ List<String> nameBatch = new ArrayList<String>();
+ while (batch_counter < batch_size && partitionNamesIter.hasNext()){
+ nameBatch.add(partitionNamesIter.next());
+ batch_counter++;
+ }
+ try {
+ batchIter = db.getPartitionsByNames(table,nameBatch).iterator();
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalStateException(
+ "PartitionIterable is a read-only iterable and remove() is unsupported");
+ }
+ };
+ }
+
+ enum Type {
+ LIST_PROVIDED, // Where a List<Partitions is already provided
+ LAZY_FETCH_PARTITIONS // Where we want to fetch Partitions lazily when they're needed.
+ };
+
+ final Type currType;
+
+ // used for LIST_PROVIDED cases
+ private List<Partition> ptnsProvided = null;
+
+ // used for LAZY_FETCH_PARTITIONS cases
+ private Hive db = null;
+ private Table table = null;
+ private Map<String, String> partialPartitionSpec = null;
+ private List<String> partitionNames = null;
+ private int batch_size;
+
+ /**
+ * Dummy constructor, which simply acts as an iterator on an already-present
+ * list of partitions, allows for easy drop-in replacement for other methods
+ * that already have a List<Partition>
+ */
+ public PartitionIterable(List<Partition> ptnsProvided){
+ this.currType = Type.LIST_PROVIDED;
+ this.ptnsProvided = ptnsProvided;
+ }
+
+ /**
+ * Primary constructor that fetches all partitions in a given table, given
+ * a Hive object and a table object, and a partial partition spec.
+ */
+ public PartitionIterable(Hive db, Table table, Map<String, String> partialPartitionSpec,
+ int batch_size) throws HiveException {
+ this.currType = Type.LAZY_FETCH_PARTITIONS;
+ this.db = db;
+ this.table = table;
+ this.partialPartitionSpec = partialPartitionSpec;
+ this.batch_size = batch_size;
+
+ if (this.partialPartitionSpec == null){
+ partitionNames = db.getPartitionNames(
+ table.getDbName(),table.getTableName(), (short) -1);
+ } else {
+ partitionNames = db.getPartitionNames(
+ table.getDbName(),table.getTableName(),partialPartitionSpec,(short)-1);
+ }
+ }
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java?rev=1654014&r1=1654013&r2=1654014&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java Thu Jan 22 19:16:37 2015
@@ -47,6 +47,8 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TJSONProtocol;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -168,37 +170,37 @@ public class EximUtil {
public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null;
public static void createExportDump(FileSystem fs, Path metadataPath, org.apache.hadoop.hive.ql.metadata.Table tableHandle,
- List<org.apache.hadoop.hive.ql.metadata.Partition> partitions) throws SemanticException, IOException {
+ Iterable<org.apache.hadoop.hive.ql.metadata.Partition> partitions) throws SemanticException, IOException {
+ OutputStream out = fs.create(metadataPath);
+ JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out);
+ jgen.writeStartObject();
+ jgen.writeStringField("version",METADATA_FORMAT_VERSION);
+ if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
+ jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+ }
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
try {
- JSONObject jsonContainer = new JSONObject();
- jsonContainer.put("version", METADATA_FORMAT_VERSION);
- if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
- jsonContainer.put("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
- }
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
- try {
- String tableDesc = serializer.toString(tableHandle.getTTable(), "UTF-8");
- jsonContainer.put("table", tableDesc);
- JSONArray jsonPartitions = new JSONArray();
- if (partitions != null) {
- for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
- String partDesc = serializer.toString(partition.getTPartition(), "UTF-8");
- jsonPartitions.put(partDesc);
- }
+ jgen.writeStringField("table", serializer.toString(tableHandle.getTTable(), "UTF-8"));
+ jgen.writeFieldName("partitions");
+ jgen.writeStartArray();
+ if (partitions != null) {
+ for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) {
+ jgen.writeString(serializer.toString(partition.getTPartition(), "UTF-8"));
+ jgen.flush();
}
- jsonContainer.put("partitions", jsonPartitions);
- } catch (TException e) {
- throw new SemanticException(
- ErrorMsg.GENERIC_ERROR
- .getMsg("Exception while serializing the metastore objects"), e);
}
- OutputStream out = fs.create(metadataPath);
- out.write(jsonContainer.toString().getBytes("UTF-8"));
- out.close();
-
- } catch (JSONException e) {
- throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg("Error in serializing metadata"), e);
+ jgen.writeEndArray();
+ } catch (TException e) {
+ throw new SemanticException(
+ ErrorMsg.GENERIC_ERROR
+ .getMsg("Exception while serializing the metastore objects"), e);
}
+ jgen.writeEndObject();
+ jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close.
+ }
+
+ private static void write(OutputStream out, String s) throws IOException {
+ out.write(s.getBytes("UTF-8"));
}
public static Map.Entry<Table, List<Partition>>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java?rev=1654014&r1=1654013&r2=1654014&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java Thu Jan 22 19:16:37 2015
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.parse;
+import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
@@ -81,11 +83,13 @@ public class ExportSemanticAnalyzer exte
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
}
- List<Partition> partitions = null;
+ PartitionIterable partitions = null;
try {
- partitions = null;
if (ts.tableHandle.isPartitioned()) {
- partitions = (ts.partitions != null) ? ts.partitions : db.getPartitions(ts.tableHandle);
+ partitions = (ts.partitions != null) ?
+ new PartitionIterable(ts.partitions) :
+ new PartitionIterable(db,ts.tableHandle,null,conf.getIntVar(
+ HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
}
Path path = new Path(ctx.getLocalTmpPath(), "_metadata");
EximUtil.createExportDump(FileSystem.getLocal(conf), path, ts.tableHandle, partitions);