You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/11/01 21:01:39 UTC
svn commit: r1636027 - in /hive/trunk:
itests/util/src/main/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/mr/
Author: thejas
Date: Sat Nov 1 20:01:39 2014
New Revision: 1636027
URL: http://svn.apache.org/r1636027
Log:
HIVE-8688 : serialized plan OutputStream is not being closed (Thejas Nair, reviewed by Jason Dere)
Modified:
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1636027&r1=1636026&r2=1636027&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Sat Nov 1 20:01:39 2014
@@ -952,7 +952,7 @@ public class QTestUtil {
for (Task<? extends Serializable> plan : tasks) {
Utilities.serializePlan(plan, ofs, conf);
}
-
+ ofs.close();
fixXml4JDK7(outf.getPath());
maskPatterns(xmlPlanMask, outf.getPath());
@@ -964,6 +964,7 @@ public class QTestUtil {
return exitVal;
} finally {
conf.set(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo");
+ IOUtils.closeQuietly(ofs);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1636027&r1=1636026&r2=1636027&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Nov 1 20:01:39 2014
@@ -18,11 +18,67 @@
package org.apache.hadoop.hive.ql.exec;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
@@ -126,66 +182,11 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.ExceptionListener;
-import java.beans.Expression;
-import java.beans.PersistenceDelegate;
-import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
/**
* Utilities.
@@ -653,21 +654,33 @@ public final class Utilities {
Path planPath = getPlanPath(conf, name);
- OutputStream out;
+ OutputStream out = null;
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
// add it to the conf
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
- out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED));
- serializePlan(w, out, conf);
+ try {
+ out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED));
+ serializePlan(w, out, conf);
+ out.close();
+ out = null;
+ } finally {
+ IOUtils.closeStream(out);
+ }
LOG.info("Setting plan: "+planPath.toUri().getPath());
conf.set(planPath.toUri().getPath(),
Base64.encodeBase64String(byteOut.toByteArray()));
} else {
// use the default file system of the conf
FileSystem fs = planPath.getFileSystem(conf);
- out = fs.create(planPath);
- serializePlan(w, out, conf);
+ try {
+ out = fs.create(planPath);
+ serializePlan(w, out, conf);
+ out.close();
+ out = null;
+ } finally {
+ IOUtils.closeStream(out);
+ }
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1636027&r1=1636026&r2=1636027&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Sat Nov 1 20:01:39 2014
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
@@ -38,8 +39,8 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -170,10 +171,18 @@ public class MapRedTask extends ExecDriv
// write out the plan to a local file
Path planPath = new Path(ctx.getLocalTmpPath(), "plan.xml");
- OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializePlan(plan, out, conf);
+
+ OutputStream out = null;
+ try {
+ out = FileSystem.getLocal(conf).create(planPath);
+ Utilities.serializePlan(plan, out, conf);
+ out.close();
+ out = null;
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
String isSilent = "true".equalsIgnoreCase(System
.getProperty("test.silent")) ? "-nolog" : "";
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1636027&r1=1636026&r2=1636027&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Sat Nov 1 20:01:39 2014
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -151,10 +152,19 @@ public class MapredLocalTask extends Tas
conf.setVar(ConfVars.HIVEADDEDJARS, Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR));
// write out the plan to a local file
Path planPath = new Path(ctx.getLocalTmpPath(), "plan.xml");
- OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredLocalWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializePlan(plan, out, conf);
+
+ OutputStream out = null;
+ try {
+ out = FileSystem.getLocal(conf).create(planPath);
+ Utilities.serializePlan(plan, out, conf);
+ out.close();
+ out = null;
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+
String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";