You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/11/11 02:50:18 UTC
svn commit: r712905 [3/38] - in /hadoop/core/trunk: ./ src/contrib/hive/
src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/
src/contrib/hive/common/src/java/org/apache/hadoop/hive/conf/
src/contrib/hive/conf/ src/contrib/hive/data/files/ src/con...
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Mon Nov 10 17:50:06 2008
@@ -20,10 +20,11 @@
import java.io.*;
import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -34,7 +35,12 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
* Reduce Sink Operator sends output to the reduce stage
@@ -42,15 +48,31 @@
public class ReduceSinkOperator extends TerminalOperator <reduceSinkDesc> implements Serializable {
private static final long serialVersionUID = 1L;
+
+ /**
+ * The evaluators for the key columns.
+ * Key columns decide the sort order on the reducer side.
+ * Key columns are passed to the reducer in the "key".
+ */
transient protected ExprNodeEvaluator[] keyEval;
+ /**
+ * The evaluators for the value columns.
+ * Value columns are passed to reducer in the "value".
+ */
transient protected ExprNodeEvaluator[] valueEval;
+ /**
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
+ * Partition columns decide the reducer that the current row goes to.
+ * Partition columns are not passed to reducer.
+ */
+ transient protected ExprNodeEvaluator[] partitionEval;
// TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready
transient Serializer keySerializer;
+ transient boolean keyIsText;
transient Serializer valueSerializer;
transient int tag;
transient byte[] tagByte = new byte[1];
- transient int numPartitionFields;
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
@@ -67,6 +89,12 @@
valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
}
+ partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+ i=0;
+ for(exprNodeDesc e: conf.getPartitionCols()) {
+ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+
tag = conf.getTag();
tagByte[0] = (byte)tag;
LOG.info("Using tag = " + tag);
@@ -74,13 +102,11 @@
tableDesc keyTableDesc = conf.getKeySerializeInfo();
keySerializer = (Serializer)keyTableDesc.getDeserializerClass().newInstance();
keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
tableDesc valueTableDesc = conf.getValueSerializeInfo();
valueSerializer = (Serializer)valueTableDesc.getDeserializerClass().newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- // Set the number of key fields to be used in the partitioner.
- numPartitionFields = conf.getNumPartitionFields();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -89,7 +115,7 @@
transient InspectableObject tempInspectableObject = new InspectableObject();
transient HiveKey keyWritable = new HiveKey();
- transient Text valueText;
+ transient Writable value;
transient ObjectInspector keyObjectInspector;
transient ObjectInspector valueObjectInspector;
@@ -97,64 +123,138 @@
transient ArrayList<ObjectInspector> valueFieldsObjectInspectors = new ArrayList<ObjectInspector>();
public void process(Object row, ObjectInspector rowInspector) throws HiveException {
- // TODO: use DynamicSerDe when that is ready
try {
- // Generate hashCode for the tuple
- int keyHashCode = 0;
- if (numPartitionFields == -1) {
- keyHashCode = (int)(Math.random() * Integer.MAX_VALUE);
- }
+ // Evaluate the keys
ArrayList<Object> keys = new ArrayList<Object>(keyEval.length);
for(ExprNodeEvaluator e: keyEval) {
e.evaluate(row, rowInspector, tempInspectableObject);
keys.add(tempInspectableObject.o);
- if (numPartitionFields == keys.size()) {
- keyHashCode = keys.hashCode();
- }
+ // Construct the keyObjectInspector from the first row
if (keyObjectInspector == null) {
keyFieldsObjectInspectors.add(tempInspectableObject.oi);
}
}
- if (numPartitionFields > keys.size()) {
- keyHashCode = keys.hashCode();
- }
+ // Construct the keyObjectInspector from the first row
if (keyObjectInspector == null) {
keyObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
ObjectInspectorUtils.getIntegerArray(keyFieldsObjectInspectors.size()),
keyFieldsObjectInspectors);
}
- Text key = (Text)keySerializer.serialize(keys, keyObjectInspector);
- if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
+ // Serialize the keys and append the tag
+ if (keyIsText) {
+ Text key = (Text)keySerializer.serialize(keys, keyObjectInspector);
+ if (tag == -1) {
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ int keyLength = key.getLength();
+ keyWritable.setSize(keyLength+1);
+ System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = tagByte[0];
+ }
} else {
- int keyLength = key.getLength();
- keyWritable.setSize(keyLength+1);
- System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = tagByte[0];
+ // Must be BytesWritable
+ BytesWritable key = (BytesWritable)keySerializer.serialize(keys, keyObjectInspector);
+ if (tag == -1) {
+ keyWritable.set(key.get(), 0, key.getSize());
+ } else {
+ int keyLength = key.getSize();
+ keyWritable.setSize(keyLength+1);
+ System.arraycopy(key.get(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = tagByte[0];
+ }
+ }
+ // Set the HashCode
+ int keyHashCode = 0;
+ for(ExprNodeEvaluator e: partitionEval) {
+ e.evaluate(row, rowInspector, tempInspectableObject);
+ keyHashCode = keyHashCode * 31
+ + (tempInspectableObject.o == null ? 0 : tempInspectableObject.o.hashCode());
}
keyWritable.setHashCode(keyHashCode);
+ // Evaluate the value
ArrayList<Object> values = new ArrayList<Object>(valueEval.length);
for(ExprNodeEvaluator e: valueEval) {
e.evaluate(row, rowInspector, tempInspectableObject);
values.add(tempInspectableObject.o);
+ // Construct the valueObjectInspector from the first row
if (valueObjectInspector == null) {
valueFieldsObjectInspectors.add(tempInspectableObject.oi);
}
}
+ // Construct the valueObjectInspector from the first row
if (valueObjectInspector == null) {
valueObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
ObjectInspectorUtils.getIntegerArray(valueFieldsObjectInspectors.size()),
valueFieldsObjectInspectors);
}
- valueText = (Text)valueSerializer.serialize(values, valueObjectInspector);
+ // Serialize the value
+ value = valueSerializer.serialize(values, valueObjectInspector);
} catch (SerDeException e) {
throw new HiveException(e);
}
+
try {
- out.collect(keyWritable, valueText);
+ out.collect(keyWritable, value);
} catch (IOException e) {
throw new HiveException (e);
}
}
+
+ public List<String> genColLists(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx)
+ throws SemanticException {
+ RowResolver redSinkRR = opParseCtx.get(this).getRR();
+ List<String> childColLists = new ArrayList<String>();
+
+ for(Operator<? extends Serializable> o: childOperators)
+ childColLists = Utilities.mergeUniqElems(childColLists, o.genColLists(opParseCtx));
+
+ List<String> colLists = new ArrayList<String>();
+ ArrayList<exprNodeDesc> keys = conf.getKeyCols();
+ for (exprNodeDesc key : keys)
+ colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+
+ // In case of extract child, see the columns used and propagate them
+ if ((childOperators.size() == 1) && (childOperators.get(0) instanceof ExtractOperator)) {
+ assert parentOperators.size() == 1;
+ Operator<? extends Serializable> par = parentOperators.get(0);
+ RowResolver parRR = opParseCtx.get(par).getRR();
+
+ for (String childCol : childColLists) {
+ String [] nm = redSinkRR.reverseLookup(childCol);
+ ColumnInfo cInfo = parRR.get(nm[0],nm[1]);
+ if (!colLists.contains(cInfo.getInternalName()))
+ colLists.add(cInfo.getInternalName());
+ }
+ }
+ else if ((childOperators.size() == 1) && (childOperators.get(0) instanceof JoinOperator)) {
+ assert parentOperators.size() == 1;
+ Operator<? extends Serializable> par = parentOperators.get(0);
+ RowResolver parRR = opParseCtx.get(par).getRR();
+ RowResolver childRR = opParseCtx.get(childOperators.get(0)).getRR();
+
+ for (String childCol : childColLists) {
+ String [] nm = childRR.reverseLookup(childCol);
+ ColumnInfo cInfo = redSinkRR.get(nm[0],nm[1]);
+ if (cInfo != null) {
+ cInfo = parRR.get(nm[0], nm[1]);
+ if (!colLists.contains(cInfo.getInternalName()))
+ colLists.add(cInfo.getInternalName());
+ }
+ }
+ }
+ else {
+
+ // Reduce Sink contains the columns needed - no need to aggregate from children
+ ArrayList<exprNodeDesc> vals = conf.getValueCols();
+ for (exprNodeDesc val : vals)
+ colLists = Utilities.mergeUniqElems(colLists, val.getCols());
+ }
+
+ OpParseContext ctx = opParseCtx.get(this);
+ ctx.setColNames(colLists);
+ opParseCtx.put(this, ctx);
+ return colLists;
+ }
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Nov 10 17:50:06 2008
@@ -34,6 +34,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FileUtil;
public class ScriptOperator extends Operator<scriptDesc> implements Serializable {
@@ -89,6 +90,77 @@
}
}
+
+ /**
+ * Maps a relative pathname to an absolute pathname using the
+ * PATH enviroment.
+ */
+ public class PathFinder
+ {
+ String pathenv; // a string of pathnames
+ String pathSep; // the path seperator
+ String fileSep; // the file seperator in a directory
+
+ /**
+ * Construct a PathFinder object using the path from
+ * the specified system environment variable.
+ */
+ public PathFinder(String envpath)
+ {
+ pathenv = System.getenv(envpath);
+ pathSep = System.getProperty("path.separator");
+ fileSep = System.getProperty("file.separator");
+ }
+
+ /**
+ * Appends the specified component to the path list
+ */
+ public void prependPathComponent(String str)
+ {
+ pathenv = str + pathSep + pathenv;
+ }
+
+ /**
+ * Returns the full path name of this file if it is listed in the
+ * path
+ */
+ public File getAbsolutePath(String filename)
+ {
+ if (pathenv == null || pathSep == null || fileSep == null) {
+ return null;
+ }
+ int val = -1;
+ String classvalue = pathenv + pathSep;
+
+ while (((val = classvalue.indexOf(pathSep)) >= 0) &&
+ classvalue.length() > 0) {
+ //
+ // Extract each entry from the pathenv
+ //
+ String entry = classvalue.substring(0, val).trim();
+ File f = new File(entry);
+
+ try {
+ if (f.isDirectory()) {
+ //
+ // this entry in the pathenv is a directory.
+ // see if the required file is in this directory
+ //
+ f = new File(entry + fileSep + filename);
+ }
+ //
+ // see if the filename matches and we can read it
+ //
+ if (f.isFile() && f.canRead()) {
+ return f;
+ }
+ } catch (Exception exp){ }
+ classvalue = classvalue.substring(val+1).trim();
+ }
+ return null;
+ }
+ }
+
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
@@ -104,6 +176,20 @@
scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo().getProperties());
String [] cmdArgs = splitArgs(conf.getScriptCmd());
+
+ String prog = cmdArgs[0];
+ File currentDir = new File(".").getAbsoluteFile();
+
+ if (!new File(prog).isAbsolute()) {
+ PathFinder finder = new PathFinder("PATH");
+ finder.prependPathComponent(currentDir.toString());
+ File f = finder.getAbsolutePath(prog);
+ if (f != null) {
+ cmdArgs[0] = f.getAbsolutePath();
+ }
+ f = null;
+ }
+
String [] wrappedCmdArgs = addWrapper(cmdArgs);
LOG.info("Executing " + Arrays.asList(wrappedCmdArgs));
LOG.info("tablename=" + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Mon Nov 10 17:50:06 2008
@@ -20,14 +20,18 @@
import java.io.*;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.selectDesc;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
* Select operator implementation
@@ -87,4 +91,61 @@
}
forward(output, outputObjectInspector);
}
+
+ private List<String> getColsFromExpr(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+ List<String> cols = new ArrayList<String>();
+ ArrayList<exprNodeDesc> exprList = conf.getColList();
+ for (exprNodeDesc expr : exprList)
+ cols = Utilities.mergeUniqElems(cols, expr.getCols());
+ List<Integer> listExprs = new ArrayList<Integer>();
+ for (int pos = 0; pos < exprList.size(); pos++)
+ listExprs.add(new Integer(pos));
+ OpParseContext ctx = opParseCtx.get(this);
+ ctx.setColNames(cols);
+ opParseCtx.put(this, ctx);
+ return cols;
+ }
+
+ private List<String> getColsFromExpr(List<String> colList,
+ HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+ if (colList.isEmpty())
+ return getColsFromExpr(opParseCtx);
+
+ List<String> cols = new ArrayList<String>();
+ ArrayList<exprNodeDesc> selectExprs = conf.getColList();
+ List<Integer> listExprs = new ArrayList<Integer>();
+
+ for (String col : colList) {
+ // col is the internal name i.e. position within the expression list
+ Integer pos = new Integer(col);
+ exprNodeDesc expr = selectExprs.get(pos.intValue());
+ cols = Utilities.mergeUniqElems(cols, expr.getCols());
+ listExprs.add(pos);
+ }
+
+ OpParseContext ctx = opParseCtx.get(this);
+ ctx.setColNames(cols);
+ opParseCtx.put(this, ctx);
+ return cols;
+ }
+
+ public List<String> genColLists(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx)
+ throws SemanticException {
+ List<String> cols = new ArrayList<String>();
+
+ for(Operator<? extends Serializable> o: childOperators) {
+ // if one of my children is a fileSink, return everything
+ if ((o instanceof FileSinkOperator) || (o instanceof ScriptOperator))
+ return getColsFromExpr(opParseCtx);
+
+ cols = Utilities.mergeUniqElems(cols, o.genColLists(opParseCtx));
+ }
+
+ if (conf.isSelectStar())
+ // The input to the select does not matter. Go over the expressions and return the ones which have a marked column
+ return getColsFromExpr(cols, opParseCtx);
+
+ return getColsFromExpr(opParseCtx);
+ }
+
}
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java?rev=712905&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java Mon Nov 10 17:50:06 2008
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.*;
+import java.util.*;
+import java.util.regex.Pattern;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.net.URLDecoder;
+import java.net.MalformedURLException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
+
+/*
+ * Intelligence to make clients wait if the cluster is in a bad state.
+ */
+public class Throttle {
+
+ // The percentage of maximum allocated memory that triggers GC
+ // on job tracker. This could be overridden thru the jobconf.
+ // The default is such that there is no throttling.
+ static private int DEFAULT_MEMORY_GC_PERCENT = 100;
+
+ // sleep this many seconds between each retry.
+ // This could be overridden thru the jobconf.
+ static private int DEFAULT_RETRY_PERIOD = 60;
+
+ /**
+ * fetch http://tracker.om:/gc.jsp?threshold=period
+ */
+ static void checkJobTracker(JobConf conf, Log LOG) {
+
+ try {
+ byte buffer[] = new byte[1024];
+ int threshold = conf.getInt("mapred.throttle.threshold.percent",
+ DEFAULT_MEMORY_GC_PERCENT);
+ int retry = conf.getInt("mapred.throttle.retry.period",
+ DEFAULT_RETRY_PERIOD);
+
+ // If the threshold is 100 percent, then there is no throttling
+ if (threshold == 100) {
+ return;
+ }
+
+ // find the http port for the jobtracker
+ String infoAddr = conf.get("mapred.job.tracker.http.address");
+ if (infoAddr == null) {
+ throw new IOException("Throttle: Unable to find job tracker info port.");
+ }
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+ int infoPort = infoSocAddr.getPort();
+
+ // This is the Job Tracker URL
+ String tracker = "http://" +
+ JobTracker.getAddress(conf).getHostName() + ":" +
+ infoPort +
+ "/gc.jsp?threshold=" + threshold;
+
+ while (true) {
+ // read in the first 1K characters from the URL
+ URL url = new URL(tracker);
+ LOG.debug("Throttle: URL " + tracker);
+ InputStream in = url.openStream();
+ int numRead = in.read(buffer);
+ in.close();
+ String fetchString = new String(buffer);
+
+ // fetch the xml tag <dogc>xxx</dogc>
+ Pattern dowait = Pattern.compile("<dogc>",
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+ String[] results = dowait.split(fetchString);
+ if (results.length != 2) {
+ throw new IOException("Throttle: Unable to parse response of URL " + url +
+ ". Get retuned " + fetchString);
+ }
+ dowait = Pattern.compile("</dogc>",
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+ results = dowait.split(results[1]);
+ if (results.length < 1) {
+ throw new IOException("Throttle: Unable to parse response of URL " + url +
+ ". Get retuned " + fetchString);
+ }
+
+ // if the jobtracker signalled that the threshold is not exceeded,
+ // then we return immediately.
+ if (results[0].trim().compareToIgnoreCase("false") == 0) {
+ return;
+ }
+
+ // The JobTracker has exceeded its threshold and is doing a GC.
+ // The client has to wait and retry.
+ LOG.warn("Job is being throttled because of resource crunch on the " +
+ "JobTracker. Will retry in " + retry + " seconds..");
+ Thread.sleep(retry * 1000L);
+ }
+ } catch (Exception e) {
+ LOG.warn("Job is not being throttled. " + e);
+ }
+ }
+}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Nov 10 17:50:06 2008
@@ -140,8 +140,7 @@
// Set up distributed cache
DistributedCache.createSymlink(job);
String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN";
- URI[] fileURIs = new URI[] {new URI(uriWithLink)};
- DistributedCache.setCacheFiles(fileURIs, job);
+ DistributedCache.addCacheFile(new URI(uriWithLink), job);
// Cache the object in this process too so lookups don't hit the file system
synchronized (Utilities.class) {
gWork = w;
@@ -198,15 +197,13 @@
public static tableDesc defaultTd;
static {
// by default we expect ^A separated strings
+ // This tableDesc does not provide column names. We should always use
+ // PlanUtils.getDefaultTableDesc(String separatorCode, String columns)
+ // or getBinarySortableTableDesc(List<FieldSchema> fieldSchemas) when
+ // we know the column names.
defaultTd = PlanUtils.getDefaultTableDesc("" + Utilities.ctrlaCode);
}
- public static tableDesc defaultTabTd;
- static {
- // Default tab-separated tableDesc
- defaultTabTd = PlanUtils.getDefaultTableDesc("" + Utilities.tabCode);
- }
-
public final static int newLineCode = 10;
public final static int tabCode = 9;
public final static int ctrlaCode = 1;
@@ -431,4 +428,43 @@
keyClass, valClass, compressionType, codec));
}
+
+ /**
+ * Shamelessly cloned from GenericOptionsParser
+ */
+ public static String realFile(String newFile, Configuration conf) throws IOException {
+ Path path = new Path(newFile);
+ URI pathURI = path.toUri();
+ FileSystem fs;
+
+ if (pathURI.getScheme() == null) {
+ fs = FileSystem.getLocal(conf);
+ } else {
+ fs = path.getFileSystem(conf);
+ }
+
+ if (!fs.exists(path)) {
+ return null;
+ }
+
+ try {
+ fs.close();
+ } catch(IOException e){};
+
+ return (path.makeQualified(fs).toString());
+ }
+
+ public static List<String> mergeUniqElems(List<String> src, List<String> dest) {
+ if (dest == null) return src;
+ if (src == null) return dest;
+ int pos = 0;
+
+ while (pos < dest.size()) {
+ if (!src.contains(dest.get(pos)))
+ src.add(dest.get(pos));
+ pos++;
+ }
+
+ return src;
+ }
}
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java?rev=712905&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java Mon Nov 10 17:50:06 2008
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
+
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** An {@link InputFormat} for Plain files with {@link Deserializer} records */
+public class FlatFileInputFormat<T> extends FileInputFormat<Void, FlatFileInputFormat.RowContainer<T>> {
+
+ /**
+ * A work-around until HADOOP-1230 is fixed.
+ *
+ * Allows boolean next(k,v) to be called by reference but still allow the deserializer to create a new
+ * object (i.e., row) on every call to next.
+ */
+ static public class RowContainer<T> {
+ T row;
+ }
+
+ /**
+ * An implementation of SerializationContext is responsible for looking up the Serialization implementation
+ * for the given RecordReader. Potentially based on the Configuration or some other mechanism
+ *
+ * The SerializationFactory does not give this functionality since:
+ * 1. Requires Serialization implementations to be specified in the Configuration a-priori (although same as setting
+ * a SerializationContext)
+ * 2. Does not lookup the actual subclass being deserialized. e.g., for Serializable does not have a way of configuring
+ * the actual Java class being serialized/deserialized.
+ */
+ static public interface SerializationContext<S> extends Configurable {
+
+ /**
+ * An {@link Serialization} object for objects of type S
+ * @return a serialization object for this context
+ */
+ public Serialization<S> getSerialization() throws IOException;
+
+ /**
+ * Produces the specific class to deserialize
+ */
+ public Class<? extends S> getRealClass() throws IOException;
+ }
+
+ /**
+ * The JobConf keys for the Serialization implementation
+ */
+ static public final String SerializationImplKey = "mapred.input.serialization.implKey";
+
+ /**
+ * An implementation of {@link SerializationContext} that reads the Serialization class and
+ * specific subclass to be deserialized from the JobConf.
+ *
+ */
+ static public class SerializationContextFromConf<S> implements FlatFileInputFormat.SerializationContext<S> {
+
+ /**
+ * The JobConf keys for the Class that is being deserialized.
+ */
+ static public final String SerializationSubclassKey = "mapred.input.serialization.subclassKey";
+
+ /**
+ * Implements configurable so it can use the configuration to find the right classes
+ * Note: ReflectionUtils will automatigically call setConf with the right configuration.
+ */
+ private Configuration conf;
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * @return the actual class being deserialized
+ * @exception does not currently throw IOException
+ */
+ public Class<S> getRealClass() throws IOException {
+ return (Class<S>)conf.getClass(SerializationSubclassKey, null, Object.class);
+ }
+
+ /**
+ * Looks up and instantiates the Serialization Object
+ *
+ * Important to note here that we are not relying on the Hadoop SerializationFactory part of the
+ * Serialization framework. This is because in the case of Non-Writable Objects, we cannot make any
+ * assumptions about the uniformity of the serialization class APIs - i.e., there may not be a "write"
+ * method call and a subclass may need to implement its own Serialization classes.
+ * The SerializationFactory currently returns the first (de)serializer that is compatible
+ * with the class to be deserialized; in this context, that assumption isn't necessarily true.
+ *
+ * @return the serialization object for this context
+ * @exception does not currently throw any IOException
+ */
+ public Serialization<S> getSerialization() throws IOException {
+ Class<Serialization<S>> tClass = (Class<Serialization<S>>)conf.getClass(SerializationImplKey, null, Serialization.class);
+ return tClass == null ? null : (Serialization<S>)ReflectionUtils.newInstance(tClass, conf);
+ }
+ }
+
+ /**
+ * An {@link RecordReader} for plain files with {@link Deserializer} records
+ *
+ * Reads one row at a time of type R.
+ * R is intended to be a base class of something such as: Record, Writable, Text, ...
+ *
+ */
+ public class FlatFileRecordReader<R> implements RecordReader<Void, FlatFileInputFormat.RowContainer<R>> {
+
+ /**
+ * An interface for a helper class for instantiating {@link Serialization} classes.
+ */
+ /**
+ * The stream in use - is fsin if not compressed, otherwise, it is dcin.
+ */
+ private final DataInputStream in;
+
+ /**
+ * The decompressed stream or null if the input is not decompressed.
+ */
+ private final InputStream dcin;
+
+ /**
+ * The underlying stream.
+ */
+ private final FSDataInputStream fsin;
+
+ /**
+ * For calculating progress
+ */
+ private final long end;
+
+ /**
+ * The constructed deserializer
+ */
+ private final Deserializer<R> deserializer;
+
+ /**
+ * Once EOF is reached, stop calling the deserializer
+ */
+ private boolean isEOF;
+
+ /**
+ * The JobConf which contains information needed to instantiate the correct Deserializer
+ */
+ private Configuration conf;
+
+ /**
+ * The actual class of the row's we are deserializing, not just the base class
+ */
+ private Class<R> realRowClass;
+
+
+ /**
+ * FlatFileRecordReader constructor constructs the underlying stream (potentially decompressed) and
+ * creates the deserializer.
+ *
+ * @param conf the jobconf
+ * @param split the split for this file
+ */
+ public FlatFileRecordReader(Configuration conf,
+ FileSplit split) throws IOException {
+ final Path path = split.getPath();
+ FileSystem fileSys = path.getFileSystem(conf);
+ CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
+ final CompressionCodec codec = compressionCodecs.getCodec(path);
+ this.conf = conf;
+
+ fsin = fileSys.open(path);
+ if (codec != null) {
+ dcin = codec.createInputStream(fsin);
+ in = new DataInputStream(dcin);
+ } else {
+ dcin = null;
+ in = fsin;
+ }
+
+ isEOF = false;
+ end = split.getLength();
+
+ // Instantiate a SerializationContext which this will use to lookup the Serialization class and the
+ // actual class being deserialized
+ SerializationContext<R> sinfo;
+ Class<SerializationContext<R>> sinfoClass =
+ (Class<SerializationContext<R>>)conf.getClass(SerializationContextImplKey, SerializationContextFromConf.class);
+
+ sinfo = (SerializationContext<R>)ReflectionUtils.newInstance(sinfoClass, conf);
+
+ // Get the Serialization object and the class being deserialized
+ Serialization<R> serialization = sinfo.getSerialization();
+ realRowClass = (Class<R>)sinfo.getRealClass();
+
+ deserializer = (Deserializer<R>)serialization.getDeserializer((Class<R>)realRowClass);
+ deserializer.open(in);
+ }
+
+ /**
+ * The actual class of the data being deserialized
+ */
+ private Class<R> realRowclass;
+
+ /**
+ * The JobConf key of the SerializationContext to use
+ */
+ static public final String SerializationContextImplKey = "mapred.input.serialization.context_impl";
+
+ /**
+ * @return null
+ */
+ public Void createKey() {
+ return null;
+ }
+
+ /**
+ * @return a new R instance.
+ */
+ public RowContainer<R> createValue() {
+ RowContainer<R> r = new RowContainer<R>();
+ r.row = (R)ReflectionUtils.newInstance(realRowClass, conf);
+ return r;
+ }
+
+ /**
+ * Returns the next row # and value
+ *
+ * @param key - void as these files have a value only
+ * @param value - the row container which is always re-used, but the internal value may be set to a new Object
+ * @return whether the key and value were read. True if they were and false if EOF
+ * @exception IOException from the deserializer
+ */
+ public synchronized boolean next(Void key, RowContainer<R> value) throws IOException {
+ if(isEOF || in.available() == 0) {
+ isEOF = true;
+ return false;
+ }
+
+ // the deserializer is responsible for actually reading each record from the stream
+ try {
+ value.row = deserializer.deserialize(value.row);
+ if (value.row == null) {
+ isEOF = true;
+ return false;
+ }
+ return true;
+ } catch(EOFException e) {
+ isEOF = true;
+ return false;
+ }
+ }
+
+ public synchronized float getProgress() throws IOException {
+ // this assumes no splitting
+ if (end == 0) {
+ return 0.0f;
+ } else {
+ // gives progress over uncompressed stream
+ // assumes deserializer is not buffering itself
+ return Math.min(1.0f, fsin.getPos()/(float)(end));
+ }
+ }
+
+ public synchronized long getPos() throws IOException {
+ // assumes deserializer is not buffering itself
+ // position over uncompressed stream. not sure what
+ // effect this has on stats about job
+ return fsin.getPos();
+ }
+
+ public synchronized void close() throws IOException {
+ // assuming that this closes the underlying streams
+ deserializer.close();
+ }
+ }
+
+ protected boolean isSplittable(FileSystem fs, Path filename) {
+ return false;
+ }
+
+ public RecordReader<Void, RowContainer<T>> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter)
+ throws IOException {
+
+ reporter.setStatus(split.toString());
+
+ return new FlatFileRecordReader<T>(job, (FileSplit) split);
+ }
+}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Mon Nov 10 17:50:06 2008
@@ -219,7 +219,6 @@
return result.toArray(new HiveInputSplit[result.size()]);
}
-
private tableDesc getTableDescFromPath(Path dir) throws IOException {
partitionDesc partDesc = pathToPartitionInfo.get(dir.toString());
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Nov 10 17:50:06 2008
@@ -38,13 +38,14 @@
import org.apache.hadoop.hive.metastore.MetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
@@ -170,6 +171,7 @@
}
tbl.setSerializationLib(org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.class.getName());
tbl.setNumBuckets(bucketCount);
+ tbl.setBucketCols(bucketCols);
createTable(tbl);
}
@@ -196,6 +198,9 @@
public void createTable(Table tbl) throws HiveException {
try {
tbl.initSerDe();
+ if(tbl.getCols().size() == 0) {
+ tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
+ }
tbl.checkValidity();
msc.createTable(tbl.getTTable());
} catch (Exception e) {
@@ -652,5 +657,16 @@
}
return new MetaStoreClient(this.conf);
}
+
+ public static List<FieldSchema> getFieldsFromDeserializer(String name, Deserializer serde) throws HiveException {
+ try {
+ return MetaStoreUtils.getFieldsFromDeserializer(name, serde);
+ } catch (SerDeException e) {
+ throw new HiveException("Error in getting fields from serde.", e);
+ } catch (MetaException e) {
+ throw new HiveException("Error in getting fields from serde.", e);
+ }
+ }
+
};
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Mon Nov 10 17:50:06 2008
@@ -36,7 +36,6 @@
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
/**
@@ -205,6 +204,10 @@
return(ret);
}
+ public Path getPartitionPath() {
+ return this.partPath;
+ }
+
final public URI getDataLocation() {
return this.partURI;
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Mon Nov 10 17:50:06 2008
@@ -23,6 +23,7 @@
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -40,13 +41,13 @@
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
-import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
@@ -129,6 +130,14 @@
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
}
+ public void reinitSerDe() throws HiveException {
+ try {
+ deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), this.getTTable());
+ } catch (MetaException e) {
+ throw new HiveException(e);
+ }
+ }
+
protected void initSerDe() throws HiveException {
if (deserializer == null) {
try {
@@ -138,13 +147,16 @@
}
}
}
-
+
public void checkValidity() throws HiveException {
// check for validity
String name = getTTable().getTableName();
if (null == name || name.length() == 0 || !MetaStoreUtils.validateName(name)) {
throw new HiveException("[" + name + "]: is not a valid table name");
}
+ if (0 == getCols().size()) {
+ throw new HiveException("atleast one column must be specified for the table");
+ }
if (null == getDeserializer()) {
throw new HiveException("must specify a non-null serDe");
}
@@ -154,6 +166,30 @@
if (null == getOutputFormatClass()) {
throw new HiveException("must specify an OutputFormat class");
}
+
+ Iterator<FieldSchema> iterCols = getCols().iterator();
+ List<String> colNames = new ArrayList<String>();
+ while (iterCols.hasNext()) {
+ String colName = iterCols.next().getName();
+ Iterator<String> iter = colNames.iterator();
+ while (iter.hasNext()) {
+ String oldColName = iter.next();
+ if (colName.equalsIgnoreCase(oldColName))
+ throw new HiveException("Duplicate column name " + colName + " in the table definition.");
+ }
+ colNames.add(colName.toLowerCase());
+ }
+
+ if (getPartCols() != null)
+ {
+ // there is no overlap between columns and partitioning columns
+ Iterator<FieldSchema> partColsIter = getPartCols().iterator();
+ while (partColsIter.hasNext()) {
+ String partCol = partColsIter.next().getName();
+ if(colNames.contains(partCol.toLowerCase()))
+ throw new HiveException("Partition collumn name " + partCol + " conflicts with table columns.");
+ }
+ }
return;
}
@@ -190,6 +226,13 @@
}
final public Deserializer getDeserializer() {
+ if(deserializer == null) {
+ try {
+ initSerDe();
+ } catch (HiveException e) {
+ LOG.error("Error in initializing serde.", e);
+ }
+ }
return deserializer;
}
@@ -360,9 +403,30 @@
}
public List<FieldSchema> getCols() {
- return getTTable().getSd().getCols();
+ boolean isNative = SerDeUtils.isNativeSerDe(getSerializationLib());
+ if (isNative)
+ return getTTable().getSd().getCols();
+ else {
+ try {
+ return Hive.getFieldsFromDeserializer(getName(), getDeserializer());
+ } catch (HiveException e) {
+ LOG.error("Unable to get field from serde: " + getSerializationLib(), e);
+ }
+ return new ArrayList<FieldSchema>();
+ }
}
+ /**
+ * Returns a list of all the columns of the table (data columns + partition columns in that order.
+ *
+ * @return List<FieldSchema>
+ */
+ public List<FieldSchema> getAllCols() {
+ ArrayList<FieldSchema> f_list = new ArrayList<FieldSchema>();
+ f_list.addAll(getPartCols());
+ f_list.addAll(getCols());
+ return f_list;
+ }
public void setPartCols(List<FieldSchema> partCols) {
getTTable().setPartitionKeys(partCols);
}
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=712905&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Mon Nov 10 17:50:06 2008
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+/**
+ * Implementation of one of the rule-based optimization steps. ColumnPruner gets the current operator tree. The tree is traversed to find out the columns used
+ * for all the base tables. If all the columns for a table are not used, a select is pushed on top of that table (to select only those columns). Since this
+ * changes the row resolver, the tree is built again. This can be optimized later to patch the tree.
+ */
+public class ColumnPruner implements Transform {
+ private ParseContext pctx;
+
+ /**
+ * empty constructor
+ */
+ public ColumnPruner() {
+ pctx = null;
+ }
+
+ /**
+ * Whether some column pruning needs to be done
+ * @param op Operator for the base table
+ * @param colNames columns needed by the query
+ * @return boolean
+ */
+ private boolean pushSelect(Operator<? extends Serializable> op, List<String> colNames) {
+ if (pctx.getOpParseCtx().get(op).getRR().getColumnInfos().size() == colNames.size()) return false;
+ return true;
+ }
+
+ /**
+ * update the map between operator and row resolver
+ * @param op operator being inserted
+ * @param rr row resolver of the operator
+ * @return
+ */
+ @SuppressWarnings("nls")
+ private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) {
+ OpParseContext ctx = new OpParseContext(rr);
+ pctx.getOpParseCtx().put(op, ctx);
+ return op;
+ }
+
+ /**
+ * insert a select to include only columns needed by the query
+ * @param input operator for the base table
+ * @param colNames columns needed
+ * @return
+ * @throws SemanticException
+ */
+ @SuppressWarnings("nls")
+ private Operator genSelectPlan(Operator input, List<String> colNames)
+ throws SemanticException {
+
+ RowResolver inputRR = pctx.getOpParseCtx().get(input).getRR();
+ RowResolver outputRR = new RowResolver();
+ ArrayList<exprNodeDesc> col_list = new ArrayList<exprNodeDesc>();
+
+ // Iterate over the selects
+ for (int pos = 0; pos < colNames.size(); pos++) {
+ String internalName = colNames.get(pos);
+ String[] colName = inputRR.reverseLookup(internalName);
+ ColumnInfo in = inputRR.get(colName[0], colName[1]);
+ outputRR.put(colName[0], colName[1],
+ new ColumnInfo((Integer.valueOf(pos)).toString(), in.getType()));
+ col_list.add(new exprNodeColumnDesc(in.getType(), internalName));
+ }
+
+ Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new selectDesc(col_list), new RowSchema(outputRR.getColumnInfos()), input), outputRR);
+
+ return output;
+ }
+
+ /**
+ * reset parse context
+ * @param pctx parse context
+ */
+ private void resetParseContext(ParseContext pctx) {
+ pctx.getAliasToPruner().clear();
+ pctx.getAliasToSamplePruner().clear();
+ pctx.getLoadTableWork().clear();
+ pctx.getLoadFileWork().clear();
+ Iterator<Operator<? extends Serializable>> iter = pctx.getOpParseCtx().keySet().iterator();
+ while (iter.hasNext()) {
+ Operator<? extends Serializable> op = iter.next();
+ if ((!pctx.getTopOps().containsValue(op)) && (!pctx.getTopSelOps().containsValue(op)))
+ iter.remove();
+ }
+ }
+
+ /**
+ * Transform the query tree. For each table under consideration, check if all columns are needed. If not, only select the operators needed at
+ * the beginning and proceed
+ */
+ public ParseContext transform(ParseContext pactx) throws SemanticException {
+ this.pctx = pactx;
+ boolean done = true;
+ // generate useful columns for all the sources so that they can be pushed immediately after the table scan
+ for (String alias_id : pctx.getTopOps().keySet()) {
+ Operator<? extends Serializable> topOp = pctx.getTopOps().get(alias_id);
+
+ // Scan the tree bottom-up and generate columns needed for the top operator
+ List<String> colNames = topOp.genColLists(pctx.getOpParseCtx());
+
+ // do we need to push a SELECT - all the columns of the table are not used
+ if (pushSelect(topOp, colNames)) {
+ topOp.setChildOperators(null);
+
+ // Generate a select and make it a child of the table scan
+ Operator select = genSelectPlan(topOp, colNames);
+ pctx.getTopSelOps().put(alias_id, select);
+ done = false;
+ }
+ }
+
+ // a select was pushed on top of the table. The old plan is no longer valid. Generate the plan again.
+ // The current tables and the select pushed above (after column pruning) are maintained in the parse context.
+ if (!done) {
+ SemanticAnalyzer sem = (SemanticAnalyzer)SemanticAnalyzerFactory.get(pctx.getConf(), pctx.getParseTree());
+
+ resetParseContext(pctx);
+ sem.init(pctx);
+ QB qb = new QB(null, null, false);
+
+ sem.doPhase1(pctx.getParseTree(), qb, sem.initPhase1Ctx());
+ sem.getMetaData(qb);
+ sem.genPlan(qb);
+ pctx = sem.getParseContext();
+ }
+ return pctx;
+ }
+}
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=712905&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Nov 10 17:50:06 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Implementation of the optimizer
+ */
+public class Optimizer {
+ private ParseContext pctx;
+ private List<Transform> transformations;
+
+ /**
+ * empty constructor
+ */
+ public Optimizer() {
+ }
+
+ /**
+ * create the list of transformations
+ */
+ public void initialize() {
+ transformations = new ArrayList<Transform>();
+ transformations.add(new ColumnPruner());
+ }
+
+ /**
+ * invoke all the transformations one-by-one, and alter the query plan
+ * @return ParseContext
+ * @throws SemanticException
+ */
+ public ParseContext optimize() throws SemanticException {
+ for (Transform t : transformations)
+ pctx = t.transform(pctx);
+ return pctx;
+ }
+
+ /**
+ * @return the pctx
+ */
+ public ParseContext getPctx() {
+ return pctx;
+ }
+
+ /**
+ * @param pctx the pctx to set
+ */
+ public void setPctx(ParseContext pctx) {
+ this.pctx = pctx;
+ }
+
+
+}
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Transform.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Transform.java?rev=712905&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Transform.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Transform.java Mon Nov 10 17:50:06 2008
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Optimizer interface. All the rule-based optimizations implement this interface. All the transformations are invoked sequentially. They take the current
+ * parse context (which contains the operator tree among other things), perform all the optimizations, and then return the updated parse context.
+ */
+public interface Transform {
+ /**
+ * All transformation steps implement this interface
+ * @param pctx input parse context
+ * @return ParseContext
+ * @throws SemanticException
+ */
+ public ParseContext transform(ParseContext pctx) throws SemanticException;
+}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Mon Nov 10 17:50:06 2008
@@ -104,9 +104,10 @@
}
public static String stripQuotes(String val) throws SemanticException {
- if (val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'') {
+ if ((val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'')
+ || (val.charAt(0) == '\"' && val.charAt(val.length() - 1) == '\"')) {
val = val.substring(1, val.length() - 1);
- }
+ }
return val;
}
@@ -142,19 +143,48 @@
}
}
+ /**
+ * Remove the encapsulating "`" pair from the identifier.
+ * We allow users to use "`" to escape identifier for table names,
+ * column names and aliases, in case that coincide with Hive language
+ * keywords.
+ */
+ public static String unescapeIdentifier(String val) {
+ if (val == null) {
+ return null;
+ }
+ if (val.charAt(0) == '`' && val.charAt(val.length() - 1) == '`') {
+ val = val.substring(1, val.length() - 1);
+ }
+ return val;
+ }
+
@SuppressWarnings("nls")
public static String unescapeSQLString(String b) {
- assert(b.charAt(0) == '\'');
- assert(b.charAt(b.length()-1) == '\'');
+
+ Character enclosure = null;
// Some of the strings can be passed in as unicode. For example, the
// delimiter can be passed in as \002 - So, we first check if the
// string is a unicode number, else go back to the old behavior
StringBuilder sb = new StringBuilder(b.length());
- int i = 1;
- while (i < (b.length()-1)) {
-
- if (b.charAt(i) == '\\' && (i+4 < b.length())) {
+ for (int i=0; i < b.length(); i++) {
+
+ char currentChar = b.charAt(i);
+ if (enclosure == null) {
+ if (currentChar == '\'' || b.charAt(i) == '\"') {
+ enclosure = currentChar;
+ }
+ // ignore all other chars outside the enclosure
+ continue;
+ }
+
+ if (enclosure.equals(currentChar)) {
+ enclosure = null;
+ continue;
+ }
+
+ if (currentChar == '\\' && (i+4 < b.length())) {
char i1 = b.charAt(i+1);
char i2 = b.charAt(i+2);
char i3 = b.charAt(i+3);
@@ -167,12 +197,12 @@
bValArr[0] = bVal;
String tmp = new String(bValArr);
sb.append(tmp);
- i += 4;
+ i += 3;
continue;
}
}
-
- if (b.charAt(i) == '\\' && (i+2 < b.length())) {
+
+ if (currentChar == '\\' && (i+2 < b.length())) {
char n=b.charAt(i+1);
switch(n) {
case '0': sb.append("\0"); break;
@@ -191,9 +221,8 @@
}
i++;
} else {
- sb.append(b.charAt(i));
+ sb.append(currentChar);
}
- i++;
}
return sb.toString();
}
@@ -219,7 +248,7 @@
try {
// get table metadata
- tableName = ast.getChild(0).getText();
+ tableName = unescapeIdentifier(ast.getChild(0).getText());
tableHandle = db.getTable(tableName);
// get partition metadata if partition specified
@@ -230,7 +259,7 @@
for (int i = 0; i < partspec.getChildCount(); ++i) {
CommonTree partspec_val = (CommonTree) partspec.getChild(i);
String val = stripQuotes(partspec_val.getChild(1).getText());
- partSpec.put(partspec_val.getChild(0).getText(), val);
+ partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText()), val);
}
partHandle = Hive.get().getPartition(tableHandle, partSpec, forceCreatePartition);
if(partHandle == null) {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Mon Nov 10 17:50:06 2008
@@ -27,6 +27,7 @@
import org.antlr.runtime.tree.CommonTree;
import org.antlr.runtime.tree.Tree;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -45,12 +46,15 @@
import org.apache.hadoop.hive.ql.plan.showTablesDesc;
import org.apache.hadoop.hive.ql.plan.alterTableDesc.alterTableTypes;
import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
private static final Log LOG = LogFactory.getLog("hive.ql.parse.DDLSemanticAnalyzer");
public static final Map<Integer, String> TokenToTypeName = new HashMap<Integer, String>();
static {
+ TokenToTypeName.put(HiveParser.TOK_BOOLEAN, Constants.BOOLEAN_TYPE_NAME);
TokenToTypeName.put(HiveParser.TOK_TINYINT, Constants.TINYINT_TYPE_NAME);
+ TokenToTypeName.put(HiveParser.TOK_SMALLINT, Constants.SMALLINT_TYPE_NAME);
TokenToTypeName.put(HiveParser.TOK_INT, Constants.INT_TYPE_NAME);
TokenToTypeName.put(HiveParser.TOK_BIGINT, Constants.BIGINT_TYPE_NAME);
TokenToTypeName.put(HiveParser.TOK_FLOAT, Constants.FLOAT_TYPE_NAME);
@@ -96,18 +100,26 @@
analyzeAlterTableModifyCols(ast, alterTableTypes.REPLACECOLS);
else if (ast.getToken().getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS)
analyzeAlterTableDropParts(ast);
+ else if (ast.getToken().getType() == HiveParser.TOK_ALTERTABLE_PROPERTIES)
+ analyzeAlterTableProps(ast);
+ else if (ast.getToken().getType() == HiveParser.TOK_ALTERTABLE_SERDEPROPERTIES)
+ analyzeAlterTableSerdeProps(ast);
+ else if (ast.getToken().getType() == HiveParser.TOK_ALTERTABLE_SERIALIZER)
+ analyzeAlterTableSerde(ast);
else if (ast.getToken().getType() == HiveParser.TOK_SHOWPARTITIONS)
{
ctx.setResFile(new Path(getTmpFileName()));
analyzeShowPartitions(ast);
}
+ else {
+ throw new SemanticException("Unsupported command.");
+ }
}
private void analyzeCreateTable(CommonTree ast, boolean isExt)
throws SemanticException {
- String tableName = ast.getChild(0).getText();
- CommonTree colList = (CommonTree)ast.getChild(1);
- List<FieldSchema> cols = getColumns(colList);
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ List<FieldSchema> cols = null;
List<FieldSchema> partCols = null;
List<String> bucketCols = null;
List<Order> sortCols = null;
@@ -117,19 +129,23 @@
String mapKeyDelim = null;
String lineDelim = null;
String comment = null;
- boolean isSequenceFile = false;
+ boolean isSequenceFile =
+ "SequenceFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT));
String location = null;
String serde = null;
Map<String, String> mapProp = null;
LOG.info("Creating table" + tableName);
int numCh = ast.getChildCount();
- for (int num = 2; num < numCh; num++)
+ for (int num = 1; num < numCh; num++)
{
CommonTree child = (CommonTree)ast.getChild(num);
switch (child.getToken().getType()) {
+ case HiveParser.TOK_TABCOLLIST:
+ cols = getColumns(child);
+ break;
case HiveParser.TOK_TABLECOMMENT:
- comment = child.getChild(0).getText();
+ comment = unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPARTCOLS:
partCols = getColumns((CommonTree)child.getChild(0));
@@ -181,6 +197,9 @@
case HiveParser.TOK_TBLSEQUENCEFILE:
isSequenceFile = true;
break;
+ case HiveParser.TOK_TBLTEXTFILE:
+ isSequenceFile = false;
+ break;
case HiveParser.TOK_TABLELOCATION:
location = unescapeSQLString(child.getChild(0).getText());
break;
@@ -203,6 +222,15 @@
// no duplicate column names
// currently, it is a simple n*n algorithm - this can be optimized later if need be
// but it should not be a major bottleneck as the number of columns are anyway not so big
+
+ if((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) {
+ // for now make sure that serde exists
+ if(StringUtils.isEmpty(crtTblDesc.getSerName()) || SerDeUtils.isNativeSerDe(crtTblDesc.getSerName())) {
+ throw new SemanticException(ErrorMsg.INVALID_TBL_DDL_SERDE.getMsg());
+ }
+ return;
+ }
+
Iterator<FieldSchema> iterCols = crtTblDesc.getCols().iterator();
List<String> colNames = new ArrayList<String>();
while (iterCols.hasNext()) {
@@ -264,9 +292,9 @@
String partCol = partColsIter.next().getName();
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
- String colName = colNamesIter.next();
+ String colName = unescapeIdentifier(colNamesIter.next());
if (partCol.equalsIgnoreCase(colName))
- throw new SemanticException(ErrorMsg.COLUMN_REPAEATED_IN_PARTITIONING_COLS.getMsg());
+ throw new SemanticException(ErrorMsg.COLUMN_REPEATED_IN_PARTITIONING_COLS.getMsg());
}
}
}
@@ -274,11 +302,52 @@
private void analyzeDropTable(CommonTree ast)
throws SemanticException {
- String tableName = ast.getChild(0).getText();
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
dropTableDesc dropTblDesc = new dropTableDesc(tableName);
rootTasks.add(TaskFactory.get(new DDLWork(dropTblDesc), conf));
}
+ private void analyzeAlterTableProps(CommonTree ast) throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ HashMap<String, String> mapProp = getProps((CommonTree)(ast.getChild(1)).getChild(0));
+ alterTableDesc alterTblDesc = new alterTableDesc(alterTableTypes.ADDPROPS);
+ alterTblDesc.setProps(mapProp);
+ alterTblDesc.setOldName(tableName);
+ rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ }
+
+ private void analyzeAlterTableSerdeProps(CommonTree ast) throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ HashMap<String, String> mapProp = getProps((CommonTree)(ast.getChild(1)).getChild(0));
+ alterTableDesc alterTblDesc = new alterTableDesc(alterTableTypes.ADDSERDEPROPS);
+ alterTblDesc.setProps(mapProp);
+ alterTblDesc.setOldName(tableName);
+ rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ }
+
+ private void analyzeAlterTableSerde(CommonTree ast) throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ String serdeName = unescapeSQLString(ast.getChild(1).getText());
+ alterTableDesc alterTblDesc = new alterTableDesc(alterTableTypes.ADDSERDE);
+ if(ast.getChildCount() > 2) {
+ HashMap<String, String> mapProp = getProps((CommonTree)(ast.getChild(2)).getChild(0));
+ alterTblDesc.setProps(mapProp);
+ }
+ alterTblDesc.setOldName(tableName);
+ alterTblDesc.setSerdeName(serdeName);
+ rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ }
+
+ private HashMap<String, String> getProps(CommonTree prop) {
+ HashMap<String, String> mapProp = new HashMap<String, String>();
+ for (int propChild = 0; propChild < prop.getChildCount(); propChild++) {
+ String key = unescapeSQLString(prop.getChild(propChild).getChild(0).getText());
+ String value = unescapeSQLString(prop.getChild(propChild).getChild(1).getText());
+ mapProp.put(key,value);
+ }
+ return mapProp;
+ }
+
private List<FieldSchema> getColumns(CommonTree ast)
{
List<FieldSchema> colList = new ArrayList<FieldSchema>();
@@ -286,7 +355,7 @@
for (int i = 0; i < numCh; i++) {
FieldSchema col = new FieldSchema();
CommonTree child = (CommonTree)ast.getChild(i);
- col.setName(child.getChild(0).getText());
+ col.setName(unescapeIdentifier(child.getChild(0).getText()));
CommonTree typeChild = (CommonTree)(child.getChild(1));
if (typeChild.getToken().getType() == HiveParser.TOK_LIST)
{
@@ -303,7 +372,7 @@
col.setType(getTypeName(typeChild.getToken().getType()));
if (child.getChildCount() == 3)
- col.setComment(child.getChild(2).getText());
+ col.setComment(unescapeSQLString(child.getChild(2).getText()));
colList.add(col);
}
return colList;
@@ -315,7 +384,7 @@
int numCh = ast.getChildCount();
for (int i = 0; i < numCh; i++) {
CommonTree child = (CommonTree)ast.getChild(i);
- colList.add(child.getText());
+ colList.add(unescapeIdentifier(child.getText()));
}
return colList;
}
@@ -327,9 +396,9 @@
for (int i = 0; i < numCh; i++) {
CommonTree child = (CommonTree)ast.getChild(i);
if (child.getToken().getType() == HiveParser.TOK_TABSORTCOLNAMEASC)
- colList.add(new Order(child.getChild(0).getText(), 1));
+ colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()), 1));
else
- colList.add(new Order(child.getChild(0).getText(), 0));
+ colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()), 0));
}
return colList;
}
@@ -359,7 +428,7 @@
private void analyzeShowPartitions(CommonTree ast)
throws SemanticException {
showPartitionsDesc showPartsDesc;
- String tableName = ast.getChild(0).getText();
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
showPartsDesc = new showPartitionsDesc(tableName, ctx.getResFile());
rootTasks.add(TaskFactory.get(new DDLWork(showPartsDesc), conf));
}
@@ -379,13 +448,15 @@
private void analyzeAlterTableRename(CommonTree ast)
throws SemanticException {
- alterTableDesc alterTblDesc = new alterTableDesc(ast.getChild(0).getText(), ast.getChild(1).getText());
+ alterTableDesc alterTblDesc = new alterTableDesc(
+ unescapeIdentifier(ast.getChild(0).getText()),
+ unescapeIdentifier(ast.getChild(1).getText()));
rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
}
private void analyzeAlterTableModifyCols(CommonTree ast, alterTableTypes alterType)
throws SemanticException {
- String tblName = ast.getChild(0).getText();
+ String tblName = unescapeIdentifier(ast.getChild(0).getText());
List<FieldSchema> newCols = getColumns((CommonTree)ast.getChild(1));
alterTableDesc alterTblDesc = new alterTableDesc(tblName, newCols, alterType);
rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
@@ -396,7 +467,7 @@
List<HashMap<String, String>> partSpecs = new ArrayList<HashMap<String, String>>();
int childIndex = 0;
// get table metadata
- tblName = ast.getChild(0).getText();
+ tblName = unescapeIdentifier(ast.getChild(0).getText());
// get partition metadata if partition specified
for( childIndex = 1; childIndex < ast.getChildCount(); childIndex++) {
CommonTree partspec = (CommonTree) ast.getChild(childIndex);
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Mon Nov 10 17:50:06 2008
@@ -38,6 +38,7 @@
INVALID_OPERATOR_SIGNATURE("Operator Argument Type Mismatch"),
INVALID_JOIN_CONDITION_1("Both Left and Right Aliases Encountered in Join"),
INVALID_JOIN_CONDITION_2("Neither Left nor Right Aliases Encountered in Join"),
+ INVALID_JOIN_CONDITION_3("OR not supported in Join currently"),
INVALID_TRANSFORM("TRANSFORM with Other Select Columns not Supported"),
DUPLICATE_GROUPBY_KEY("Repeated Key in Group By"),
UNSUPPORTED_MULTIPLE_DISTINCTS("DISTINCT on Different Columns not Supported"),
@@ -52,14 +53,18 @@
INVALID_MAPINDEX_TYPE("Map Key Type does not Match Index Expression Type"),
NON_COLLECTION_TYPE("[] not Valid on Non Collection Types"),
SELECT_DISTINCT_WITH_GROUPBY("SELECT DISTINCT and GROUP BY can not be in the same query"),
- COLUMN_REPAEATED_IN_PARTITIONING_COLS("Column repeated in partitioning columns"),
+ COLUMN_REPEATED_IN_PARTITIONING_COLS("Column repeated in partitioning columns"),
DUPLICATE_COLUMN_NAMES("Duplicate column names"),
COLUMN_REPEATED_IN_CLUSTER_SORT("Same column cannot appear in cluster and sort by"),
SAMPLE_RESTRICTION("Cannot Sample on More Than Two Columns"),
SAMPLE_COLUMN_NOT_FOUND("Sample Column Not Found"),
NO_PARTITION_PREDICATE("No Partition Predicate Found"),
- INVALID_DOT(". operator is only supported on struct or list of struct types");
-
+ INVALID_DOT(". operator is only supported on struct or list of struct types"),
+ INVALID_TBL_DDL_SERDE("Either list of columns or a custom serializer should be specified"),
+ TARGET_TABLE_COLUMN_MISMATCH("Cannot insert into target table because column number/types are different"),
+ TABLE_ALIAS_NOT_ALLOWED("Table Alias not Allowed in Sampling Clause"),
+ NON_BUCKETED_TABLE("Sampling Expression Needed for Non-Bucketed Table");
+
private String mesg;
ErrorMsg(String mesg) {
this.mesg = mesg;
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Mon Nov 10 17:50:06 2008
@@ -49,7 +49,7 @@
TOK_TRUE;
TOK_FALSE;
TOK_TRANSFORM;
-TOK_COLLIST;
+TOK_EXPLIST;
TOK_ALIASLIST;
TOK_GROUPBY;
TOK_ORDERBY;
@@ -64,6 +64,7 @@
TOK_ISNULL;
TOK_ISNOTNULL;
TOK_TINYINT;
+TOK_SMALLINT;
TOK_INT;
TOK_BIGINT;
TOK_BOOLEAN;
@@ -81,6 +82,9 @@
TOK_ALTERTABLE_ADDCOLS;
TOK_ALTERTABLE_REPLACECOLS;
TOK_ALTERTABLE_DROPPARTS;
+TOK_ALTERTABLE_SERDEPROPERTIES;
+TOK_ALTERTABLE_SERIALIZER;
+TOK_ALTERTABLE_PROPERTIES;
TOK_SHOWTABLES;
TOK_SHOWPARTITIONS;
TOK_CREATEEXTTABLE;
@@ -96,6 +100,7 @@
TOK_TABLEROWFORMATMAPKEYS;
TOK_TABLEROWFORMATLINES;
TOK_TBLSEQUENCEFILE;
+TOK_TBLTEXTFILE;
TOK_TABCOLNAME;
TOK_TABLELOCATION;
TOK_TABLESAMPLE;
@@ -106,10 +111,10 @@
TOK_CREATEFUNCTION;
TOK_EXPLAIN;
TOK_TABLESERIALIZER;
-TOK_TABLSERDEPROPERTIES;
-TOK_TABLESERDEPROPLIST;
+TOK_TABLEPROPERTIES;
+TOK_TABLEPROPLIST;
TOK_LIMIT;
-TOKTABLESERDEPROPERTY;
+TOK_TABLEPROPERTY;
}
@@ -157,9 +162,9 @@
;
createStatement
- : KW_CREATE (ext=KW_EXTERNAL)? KW_TABLE name=Identifier LPAREN columnNameTypeList RPAREN tableComment? tablePartition? tableBuckets? tableRowFormat? tableFileFormat? tableLocation?
- -> {$ext == null}? ^(TOK_CREATETABLE $name columnNameTypeList tableComment? tablePartition? tableBuckets? tableRowFormat? tableFileFormat? tableLocation?)
- -> ^(TOK_CREATEEXTTABLE $name columnNameTypeList tableComment? tablePartition? tableBuckets? tableRowFormat? tableFileFormat? tableLocation?)
+ : KW_CREATE (ext=KW_EXTERNAL)? KW_TABLE name=Identifier (LPAREN columnNameTypeList RPAREN)? tableComment? tablePartition? tableBuckets? tableRowFormat? tableFileFormat? tableLocation?
+ -> {$ext == null}? ^(TOK_CREATETABLE $name columnNameTypeList? tableComment? tablePartition? tableBuckets? tableRowFormat? tableFileFormat? tableLocation?)
+ -> ^(TOK_CREATEEXTTABLE $name columnNameTypeList? tableComment? tablePartition? tableBuckets? tableRowFormat? tableFileFormat? tableLocation?)
;
dropStatement
@@ -170,6 +175,8 @@
: alterStatementRename
| alterStatementAddCol
| alterStatementDropPartitions
+ | alterStatementProperties
+ | alterStatementSerdeProperties
;
alterStatementRename
@@ -188,6 +195,18 @@
-> ^(TOK_ALTERTABLE_DROPPARTS Identifier partitionSpec+)
;
+alterStatementProperties
+ : KW_ALTER KW_TABLE name=Identifier KW_SET KW_PROPERTIES tableProperties
+ -> ^(TOK_ALTERTABLE_PROPERTIES $name tableProperties)
+ ;
+
+alterStatementSerdeProperties
+ : KW_ALTER KW_TABLE name=Identifier KW_SET KW_SERDE serde=StringLiteral (KW_WITH KW_SERDEPROPERTIES tableProperties)?
+ -> ^(TOK_ALTERTABLE_SERIALIZER $name $serde tableProperties?)
+ | KW_ALTER KW_TABLE name=Identifier KW_SET KW_SERDEPROPERTIES tableProperties
+ -> ^(TOK_ALTERTABLE_SERDEPROPERTIES $name tableProperties)
+ ;
+
descStatement
: KW_DESCRIBE (isExtended=KW_EXTENDED)? (tab=tabName) -> ^(TOK_DESCTABLE $tab $isExtended?)
;
@@ -227,23 +246,23 @@
:
KW_ROW KW_FORMAT KW_DELIMITED tableRowFormatFieldIdentifier? tableRowFormatCollItemsIdentifier? tableRowFormatMapKeysIdentifier? tableRowFormatLinesIdentifier?
-> ^(TOK_TABLEROWFORMAT tableRowFormatFieldIdentifier? tableRowFormatCollItemsIdentifier? tableRowFormatMapKeysIdentifier? tableRowFormatLinesIdentifier?)
- | KW_ROW KW_FORMAT KW_SERIALIZER name=StringLiteral tableSerializerProperties?
- -> ^(TOK_TABLESERIALIZER $name tableSerializerProperties?)
+ | KW_ROW KW_FORMAT KW_SERDE name=StringLiteral (KW_WITH KW_SERDEPROPERTIES serdeprops=tableProperties)?
+ -> ^(TOK_TABLESERIALIZER $name $serdeprops?)
;
-tableSerializerProperties
+tableProperties
:
- KW_WITH KW_PROPERTIES LPAREN propertiesList RPAREN -> ^(TOK_TABLSERDEPROPERTIES propertiesList)
+ LPAREN propertiesList RPAREN -> ^(TOK_TABLEPROPERTIES propertiesList)
;
propertiesList
:
- keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_TABLESERDEPROPLIST keyValueProperty+)
+ keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_TABLEPROPLIST keyValueProperty+)
;
keyValueProperty
:
- key=StringLiteral EQUAL value=StringLiteral -> ^(TOKTABLESERDEPROPERTY $key $value)
+ key=StringLiteral EQUAL value=StringLiteral -> ^(TOK_TABLEPROPERTY $key $value)
;
tableRowFormatFieldIdentifier
@@ -273,6 +292,7 @@
tableFileFormat
:
KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE
+ | KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE
;
tableLocation
@@ -317,6 +337,7 @@
primitiveType
: KW_TINYINT -> TOK_TINYINT
+ | KW_SMALLINT -> TOK_SMALLINT
| KW_INT -> TOK_INT
| KW_BIGINT -> TOK_BIGINT
| KW_BOOLEAN -> TOK_BOOLEAN
@@ -420,23 +441,22 @@
selectList
:
- selectItem
- ( COMMA selectItem )* -> selectItem+
+ selectItem ( COMMA selectItem )* -> selectItem+
+ | trfmClause -> ^(TOK_SELEXPR trfmClause)
;
selectItem
:
- trfmClause -> ^(TOK_SELEXPR trfmClause)
- | (selectExpression (KW_AS Identifier)?) -> ^(TOK_SELEXPR selectExpression Identifier?)
+ ( selectExpression (KW_AS Identifier)?) -> ^(TOK_SELEXPR selectExpression Identifier?)
;
trfmClause
:
KW_TRANSFORM
- LPAREN columnList RPAREN
- KW_AS
- LPAREN aliasList RPAREN
- KW_USING StringLiteral -> ^(TOK_TRANSFORM columnList aliasList StringLiteral)
+ LPAREN expressionList RPAREN
+ KW_USING StringLiteral
+ (KW_AS LPAREN aliasList RPAREN)?
+ -> ^(TOK_TRANSFORM expressionList StringLiteral aliasList?)
;
selectExpression
@@ -448,18 +468,19 @@
tableAllColumns
:
- Identifier DOT STAR -> ^(TOK_ALLCOLREF Identifier)
+ STAR -> ^(TOK_ALLCOLREF)
+ | Identifier DOT STAR -> ^(TOK_ALLCOLREF Identifier)
;
// table.column
tableColumn
:
- (tab=Identifier)? DOT col=Identifier -> ^(TOK_COLREF $tab? $col)
+ (tab=Identifier DOT)? col=Identifier -> ^(TOK_COLREF $tab? $col)
;
-columnList
+expressionList
:
- tableColumn (COMMA tableColumn)* -> ^(TOK_COLLIST tableColumn+)
+ expression (COMMA expression)* -> ^(TOK_EXPLIST expression+)
;
aliasList
@@ -478,7 +499,7 @@
joinSource
:
fromSource
- ( joinToken^ fromSource (KW_ON! precedenceEqualExpression)? )+
+ ( joinToken^ fromSource (KW_ON! expression)? )+
;
joinToken
@@ -496,7 +517,7 @@
tableSample
:
- KW_TABLESAMPLE LPAREN KW_BUCKET (numerator=Number) KW_OUT KW_OF (denominator=Number) (KW_ON col+=Identifier (COMMA col+=Identifier)*)? RPAREN -> ^(TOK_TABLESAMPLE $numerator $denominator $col*)
+ KW_TABLESAMPLE LPAREN KW_BUCKET (numerator=Number) KW_OUT KW_OF (denominator=Number) (KW_ON expr+=expression (COMMA expr+=expression)*)? RPAREN -> ^(TOK_TABLESAMPLE $numerator $denominator $expr*)
;
tableSource
@@ -570,12 +591,12 @@
: // LEFT and RIGHT keywords are also function names
Identifier
LPAREN (
- (dist=KW_DISTINCT)?
- expression
- (COMMA expression)*
+ ((dist=KW_DISTINCT)?
+ expression
+ (COMMA expression)*)?
)?
- RPAREN -> {$dist == null}? ^(TOK_FUNCTION Identifier expression+)
- -> ^(TOK_FUNCTIONDI Identifier expression+)
+ RPAREN -> {$dist == null}? ^(TOK_FUNCTION Identifier (expression+)?)
+ -> ^(TOK_FUNCTIONDI Identifier (expression+)?)
;
@@ -644,7 +665,7 @@
precedenceUnaryExpression (precedenceBitwiseXorOperator^ precedenceUnaryExpression)*
;
-
+
precedenceStarOperator
:
STAR | DIVIDE | MOD
@@ -808,6 +829,7 @@
KW_COMMENT: 'COMMENT';
KW_BOOLEAN: 'BOOLEAN';
KW_TINYINT: 'TINYINT';
+KW_SMALLINT: 'SMALLINT';
KW_INT: 'INT';
KW_BIGINT: 'BIGINT';
KW_FLOAT: 'FLOAT';
@@ -834,6 +856,7 @@
KW_LINES: 'LINES';
KW_STORED: 'STORED';
KW_SEQUENCEFILE: 'SEQUENCEFILE';
+KW_TEXTFILE: 'TEXTFILE';
KW_LOCATION: 'LOCATION';
KW_TABLESAMPLE: 'TABLESAMPLE';
KW_BUCKET: 'BUCKET';
@@ -849,10 +872,12 @@
KW_FUNCTION: 'FUNCTION';
KW_EXPLAIN: 'EXPLAIN';
KW_EXTENDED: 'EXTENDED';
-KW_SERIALIZER: 'SERIALIZER';
+KW_SERDE: 'SERDE';
KW_WITH: 'WITH';
-KW_PROPERTIES: 'SERDEPROPERTIES';
+KW_SERDEPROPERTIES: 'SERDEPROPERTIES';
KW_LIMIT: 'LIMIT';
+KW_SET: 'SET';
+KW_PROPERTIES: 'TBLPROPERTIES';
// Operators
@@ -909,7 +934,7 @@
StringLiteral
:
- '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )*
+ ( '\'' (~'\'')* '\'' | '\"' (~'\"')* '\"' )+
;
CharSetLiteral
@@ -926,6 +951,7 @@
Identifier
:
(Letter | Digit) (Letter | Digit | '_')*
+ | '`' (Letter | Digit) (Letter | Digit | '_')* '`'
;
CharSetName