You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC
svn commit: r803312 [4/16] - in /hadoop/pig/trunk: ./ contrib/zebra/
contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/
contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/
contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/s...
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+//import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Pig LoadFunc and Slicer for Table
+ */
+public class TableLoader implements LoadFunc, Slicer {
+ static final Log LOG = LogFactory.getLog(TableLoader.class);
+ private TableInputFormat inputFormat;
+ private JobConf jobConf;
+ private String projectionString;
+ private Path[] paths;
+
+ /**
+ * default constructor
+ */
+ public TableLoader() {
+ inputFormat = new TableInputFormat();
+ }
+
+ /**
+ * @param projectionStr
+ * projection string passed from pig query.
+ */
+ public TableLoader(String projectionStr) {
+ inputFormat = new TableInputFormat();
+ projectionString = projectionStr;
+ }
+
+ @Override
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ /**
+ * @param storage
+ * @param location
+ * The location format follows the same convention as
+ * FileInputFormat's comma-separated multiple path format.
+ * @throws IOException
+ */
+ private void checkConf(DataStorage storage, String location) throws IOException {
+ if (jobConf == null) {
+ Configuration conf =
+ ConfigurationUtil.toConfiguration(storage.getConfiguration());
+ jobConf = new JobConf(conf);
+ jobConf.setInputFormat(TableInputFormat.class);
+ // TODO: the following code may better be moved to TableInputFormat.
+ // Hack: use FileInputFormat to decode comma-separated multiple path
+ // format.
+
+ FileInputFormat.setInputPaths(jobConf, location);
+ paths = FileInputFormat.getInputPaths(jobConf);
+
+ /**
+ * Performing glob pattern matching
+ */
+ List<Path> result = new ArrayList<Path>(paths.length);
+ for (Path p : paths) {
+ FileSystem fs = p.getFileSystem(jobConf);
+ FileStatus[] matches = fs.globStatus(p);
+ if (matches == null) {
+ LOG.warn("Input path does not exist: " + p);
+ }
+ else if (matches.length == 0) {
+ LOG.warn("Input Pattern " + p + " matches 0 files");
+ } else {
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ result.add(globStat.getPath());
+ } else {
+ LOG.warn("Input path " + p + " is not a directory");
+ }
+ }
+ }
+ }
+ if (result.isEmpty()) {
+ throw new IOException("No table specified for input");
+ }
+
+ LOG.info("Total input tables to process : " + result.size());
+ TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
+ try {
+ if (projectionString != null) {
+ TableInputFormat.setProjection(jobConf, projectionString);
+ }
+ } catch (ParseException e) {
+ throw new RuntimeException("Schema parsing failed : "+e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ checkConf(storage, fileName);
+
+ Projection projection;
+ org.apache.hadoop.zebra.types.Schema projectionSchema;
+
+ if (!fileName.contains(",")) { // one table;
+ org.apache.hadoop.zebra.types.Schema tschema = BasicTable.Reader.getSchema(new Path(fileName), jobConf);
+ try {
+ projection = new org.apache.hadoop.zebra.types.Projection(tschema, TableInputFormat.getProjection(jobConf));
+ projectionSchema = projection.getProjectionSchema();
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failed : "+e.getMessage());
+ }
+ } else { // table union;
+ org.apache.hadoop.zebra.types.Schema unionSchema = new org.apache.hadoop.zebra.types.Schema();
+ for (Path p : paths) {
+ org.apache.hadoop.zebra.types.Schema schema = BasicTable.Reader.getSchema(p, jobConf);
+ try {
+ unionSchema.unionSchema(schema);
+ } catch (ParseException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ try {
+ projection = new org.apache.hadoop.zebra.types.Projection(unionSchema, TableInputFormat.getProjection(jobConf));
+ projectionSchema = projection.getProjectionSchema();
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failed : "+e.getMessage());
+ }
+ }
+
+ if (projectionSchema == null) {
+ throw new IOException("Cannot determine table projection schema");
+ }
+
+ try {
+ return SchemaConverter.toPigSchema(projectionSchema);
+ } catch (FrontendException e) {
+ throw new IOException("FrontendException", e);
+ }
+ }
+
+ @Override
+ public void fieldsToRead(Schema schema) {
+ // chaow: this function never gets triggered in pig loader/storer test cases;
+
+ System.out.println("*************************fieldsToRead is invoked.");
+ try {
+ // TODO
+ //TableInputFormat.setProjection(jobConf, SchemaConverter.fromPigSchema(
+ // schema)
+ // .toString());
+ // chaow
+ //Assert.assertEquals(schema.getFields().size(), projection.getColumns().length);
+ //TableInputFormat.setProjection(jobConf, projection.toString());
+ TableInputFormat.setProjection(jobConf, projectionString);
+ } catch (ParseException e) {
+ throw new RuntimeException("Schema parsing failed : "+e.getMessage());
+ }
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public Slice[] slice(DataStorage store, String location) throws IOException {
+ checkConf(store, location);
+ // TableInputFormat accepts numSplits < 0 (special case for no-hint)
+ InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+ Slice[] slices = new Slice[splits.length];
+ for (int nx = 0; nx < slices.length; nx++) {
+ slices[nx] = new TableSlice(jobConf, splits[nx]);
+ }
+
+ return slices;
+ }
+
+ @Override
+ public void validate(DataStorage store, String location) throws IOException {
+ checkConf(store, location);
+ }
+
+ static class TableSlice implements Slice {
+ private static final long serialVersionUID = 1L;
+ private static final Class[] emptyArray = new Class[] {};
+
+ private TreeMap<String, String> configMap;
+ private InputSplit split;
+
+ transient private JobConf conf;
+ transient private String projection;
+ transient private RecordReader<BytesWritable, Tuple> scanner;
+ transient private BytesWritable key;
+
+ TableSlice(JobConf conf, InputSplit split) {
+ // hack: expecting JobConf contains nothing but a <string, string>
+ // key-value pair store.
+ configMap = new TreeMap<String, String>();
+ for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) {
+ Map.Entry<String, String> e = it.next();
+ configMap.put(e.getKey(), e.getValue());
+ }
+ this.split = split;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scanner == null) {
+ throw new IOException("Slice not initialized");
+ }
+ scanner.close();
+ }
+
+ @Override
+ public long getLength() {
+ try {
+ return split.getLength();
+ } catch (IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+ @Override
+ public String[] getLocations() {
+ try {
+ return split.getLocations();
+ } catch (IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ if (scanner == null) {
+ throw new IOException("Slice not initialized");
+ }
+ return scanner.getPos();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (scanner == null) {
+ throw new IOException("Slice not initialized");
+ }
+ return scanner.getProgress();
+ }
+
+ @Override
+ public long getStart() {
+ return 0;
+ }
+
+ @Override
+ public void init(DataStorage store) throws IOException {
+ Configuration localConf = new Configuration();
+ for (Iterator<Map.Entry<String, String>> it =
+ configMap.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, String> e = it.next();
+ localConf.set(e.getKey(), e.getValue());
+ }
+ conf = new JobConf(localConf);
+ try
+ {
+ projection = TableInputFormat.getProjection(conf);
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failed :"+e.getMessage());
+ }
+ TableInputFormat inputFormat = new TableInputFormat();
+ scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
+ key = new BytesWritable();
+ }
+
+ @Override
+ public boolean next(Tuple value) throws IOException {
+ TypesUtils.formatTuple(value, projection);
+ return scanner.next(key, value);
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeObject(configMap);
+ out.writeObject(split.getClass().getName());
+ split.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ configMap = (TreeMap<String, String>) in.readObject();
+ String className = (String) in.readObject();
+ Class<InputSplit> clazz = (Class<InputSplit>) Class.forName(className);
+ try {
+ Constructor<InputSplit> meth = clazz.getDeclaredConstructor(emptyArray);
+ meth.setAccessible(true);
+ split = meth.newInstance();
+ } catch (Exception e) {
+ throw new ClassNotFoundException("Cannot create instance", e);
+ }
+ split.readFields(in);
+ }
+ }
+
+ @Override
+ public DataBag bytesToBag(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public String bytesToCharArray(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public Double bytesToDouble(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public Float bytesToFloat(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public Long bytesToLong(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public Tuple bytesToTuple(byte[] b) throws IOException {
+ throw new IOException("Not implemented");
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.Tuple;
+
+public class TableStorer implements StoreFunc {
+ private String schemaString;
+ private String storageHintString;
+
+ public TableStorer() {
+ }
+
+ public TableStorer(String schemaStr, String storageHintStr) throws ParseException, IOException {
+ schemaString = schemaStr;
+ storageHintString = storageHintStr;
+ }
+
+ @Override
+ public void bindTo(OutputStream os) throws IOException {
+ // no op
+ }
+
+ @Override
+ public void finish() throws IOException {
+ // no op
+ }
+
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ // no op
+ }
+
+ @Override
+ public Class getStorePreparationClass() throws IOException {
+ return TableOutputFormat.class;
+ }
+
+ public String getSchemaString() {
+ return schemaString;
+ }
+
+ public String getStorageHintString() {
+ return storageHintString;
+ }
+
+ private static final Class[] emptyArray = new Class[] {};
+
+ static public void main(String[] args) throws SecurityException, NoSuchMethodException {
+ Constructor meth = TableOutputFormat.class.getDeclaredConstructor(emptyArray);
+ }
+}
+
+/**
+ *
+ * Table OutputFormat
+ *
+ */
+class TableOutputFormat implements OutputFormat<BytesWritable, Tuple> {
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
+ String location = storeConfig.getLocation();
+ TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);
+ BasicTable.Writer writer = new BasicTable.Writer(new Path(location),
+ storeFunc.getSchemaString(), storeFunc.getStorageHintString(), false, job);
+ writer.finish();
+ }
+
+ @Override
+ public RecordWriter<BytesWritable, Tuple> getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress) throws IOException {
+ return new TableRecordWriter(name, job);
+ }
+}
+
+/**
+ *
+ * Table RecordWriter
+ *
+ */
+class TableRecordWriter implements RecordWriter<BytesWritable, Tuple> {
+ final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
+ private BasicTable.Writer writer;
+ private TableInserter inserter;
+
+ public TableRecordWriter(String name, JobConf conf) throws IOException {
+ StoreConfig storeConfig = MapRedUtil.getStoreConfig(conf);
+ String location = storeConfig.getLocation();
+
+ // TODO: how to get? 1) column group splits; 2) flag of sorted-ness,
+ // compression, etc.
+ writer = new BasicTable.Writer(new Path(location), conf);
+ inserter = writer.getInserter(name, false);
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ inserter.close();
+ writer.finish();
+ }
+
+ @Override
+ public void write(BytesWritable key, Tuple value) throws IOException {
+ if (key == null) {
+ key = KEY0;
+ }
+ inserter.insert(key, value);
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.types.Schema;
+
+ /**
+ * ColumnGroup Schema. This object is first written to a schema file when the
+ * ColumnGroup is initially created, and is used to communicate meta
+ * information among writers.
+ */
+public class CGSchema {
+ private Version version;
+ private boolean sorted;
+ private String comparator;
+ private Schema schema;
+ private String compressor = "lzo2";
+ private String serializer = "pig";
+ // tmp schema file name, used as a flag of unfinished CG
+ private final static String SCHEMA_FILE = ".schema";
+ private final static String DEFAULT_COMPARATOR = "memcmp";
+ // schema version, should be same as BasicTable's most of the time
+ private final static Version SCHEMA_VERSION =
+ new Version((short) 1, (short) 0);
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{Compressor = ");
+ sb.append(compressor);
+ sb.append("}\n");
+ sb.append("{Serializer = ");
+ sb.append(serializer);
+ sb.append("}\n");
+ sb.append(schema.toString());
+
+ return sb.toString();
+ }
+
+ public static Path makeFilePath(Path parent) {
+ return new Path(parent, SCHEMA_FILE);
+ }
+
+ public static CGSchema load(FileSystem fs, Path parent) throws IOException, ParseException {
+ if (!exists(fs, parent)) return null;
+ CGSchema ret = new CGSchema();
+ ret.read(fs, parent);
+ return ret;
+ }
+
+ public CGSchema() {
+ this.version = SCHEMA_VERSION;
+ }
+
+ public CGSchema(Schema schema, boolean sorted) {
+ this.sorted = sorted;
+ this.comparator = (sorted) ? DEFAULT_COMPARATOR : "";
+ this.schema = schema;
+ this.version = SCHEMA_VERSION;
+ }
+
+ public CGSchema(Schema schema, boolean sorted, String serializer, String compressor) {
+ this(schema, sorted);
+ this.serializer = serializer;
+ this.compressor = compressor;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ CGSchema other = (CGSchema) obj;
+ return this.sorted == other.sorted
+ && this.comparator.equals(other.comparator)
+ && this.schema.equals(other.schema);
+ }
+
+ public static boolean exists(FileSystem fs, Path parent) {
+ try {
+ return fs.exists(makeFilePath(parent));
+ }
+ catch (IOException e) {
+ return false;
+ }
+ }
+
+ public static void drop(FileSystem fs, Path parent) throws IOException {
+ fs.delete(makeFilePath(parent), true);
+ }
+
+ public boolean isSorted() {
+ return sorted;
+ }
+
+ public String getComparator() {
+ return comparator;
+ }
+
+ public String getSerializer() {
+ return serializer;
+ }
+
+ public String getCompressor() {
+ return compressor;
+ }
+
+ public void create(FileSystem fs, Path parent) throws IOException {
+ FSDataOutputStream outSchema = fs.create(makeFilePath(parent), false);
+ version.write(outSchema);
+ WritableUtils.writeString(outSchema, schema.toString());
+ WritableUtils.writeVInt(outSchema, sorted ? 1 : 0);
+ WritableUtils.writeString(outSchema, comparator);
+ outSchema.close();
+ }
+
+ public void read(FileSystem fs, Path parent) throws IOException, ParseException {
+ FSDataInputStream in = fs.open(makeFilePath(parent));
+ version = new Version(in);
+ // verify compatibility against SCHEMA_VERSION
+ if (!version.compatibleWith(SCHEMA_VERSION)) {
+ new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION
+ + "; found in file: " + version);
+ }
+ String s = WritableUtils.readString(in);
+ schema = new Schema(s);
+ sorted = WritableUtils.readVInt(in) == 1 ? true : false;
+ comparator = WritableUtils.readString(in);
+ in.close();
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+ }
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.pig.data.DataType;
+
+public enum ColumnType implements Writable {
+ ANY("any") {
+ public byte pigDataType() {
+ return DataType.UNKNOWN;
+ }
+ },
+ INT("int") {
+ public byte pigDataType() {
+ return DataType.INTEGER;
+ }
+ },
+ LONG("long") {
+ public byte pigDataType() {
+ return DataType.LONG;
+ }
+ },
+ FLOAT("float") {
+ public byte pigDataType() {
+ return DataType.FLOAT;
+ }
+ },
+ DOUBLE("double") {
+ public byte pigDataType() {
+ return DataType.DOUBLE;
+ }
+ },
+ BOOL("bool") {
+ public byte pigDataType() {
+ return DataType.BOOLEAN;
+ }
+ },
+ COLLECTION("collection") {
+ public byte pigDataType() {
+ return DataType.BAG;
+ }
+ },
+ MAP("map") {
+ public byte pigDataType() {
+ return DataType.MAP;
+ }
+ },
+ RECORD("record") {
+ public byte pigDataType() {
+ return DataType.TUPLE;
+ }
+ },
+ STRING("string") {
+ public byte pigDataType() {
+ return DataType.CHARARRAY;
+ }
+ },
+ BYTES("bytes") {
+ public byte pigDataType() {
+ return DataType.BYTEARRAY;
+ }
+ };
+
+ private String name;
+
+ private ColumnType(String name) {
+ this.name = name;
+ }
+
+ public abstract byte pigDataType();
+
+ /**
+ * To get the type based on the type name string.
+ *
+ * @param name name of the type
+ *
+ * @return ColumnType Enum for the type
+ */
+ public static ColumnType getTypeByName(String name) {
+ return ColumnType.valueOf(name.toUpperCase());
+ }
+
+ public static ColumnType getTypeByPigDataType(byte dt) {
+ for (ColumnType ct : ColumnType.values()) {
+ if (ct.pigDataType() == dt) {
+ return ct;
+ }
+ }
+ return null;
+ }
+
+ public static String findTypeName(ColumnType columntype) {
+ return columntype.getName();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // no op, instantiated by the caller
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, name);
+ }
+
+ public static boolean isSchemaType(ColumnType columnType) {
+ return ((columnType == RECORD) || (columnType == MAP) || (columnType == COLLECTION));
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.pig.data.DataType;
+
+public enum FieldType implements Writable {
+ INT("int") {
+ public byte pigDataType() {
+ return DataType.INTEGER;
+ }
+ },
+
+ LONG("long") {
+ public byte pigDataType() {
+ return DataType.LONG;
+ }
+ },
+
+ FLOAT("float") {
+ public byte pigDataType() {
+ return DataType.FLOAT;
+ }
+ },
+
+ DOUBLE("double") {
+ public byte pigDataType() {
+ return DataType.DOUBLE;
+ }
+ },
+
+ BOOL("bool") {
+ public byte pigDataType() {
+ return DataType.BOOLEAN;
+ }
+ },
+
+ COLLECTION("collection") {
+ public byte pigDataType() {
+ return DataType.BAG;
+ }
+
+ public boolean isNested() {
+ return true;
+ }
+
+ public int getMaxNumsNestedField() {
+ return MAX_NUM_FIELDS;
+ }
+ },
+
+ MAP("map") {
+ public byte pigDataType() {
+ return DataType.MAP;
+ }
+
+ public boolean isNested() {
+ return true;
+ }
+
+ public int getMaxNumsNestedField() {
+ // map can only contain one field for values
+ return 1;
+ }
+ },
+
+ RECORD("record") {
+ public byte pigDataType() {
+ return DataType.TUPLE;
+ }
+
+ public boolean isNested() {
+ return true;
+ }
+
+ public int getMaxNumsNestedField() {
+ return MAX_NUM_FIELDS;
+ }
+ },
+
+ STRING("string") {
+ public byte pigDataType() {
+ return DataType.CHARARRAY;
+ }
+ },
+
+ BYTES("bytes") {
+ public byte pigDataType() {
+ return DataType.BIGCHARARRAY;
+ }
+ };
+
+ public /*final*/ static int MAX_NUM_FIELDS = 4096;
+
+ private static Map<Byte, FieldType> mapPigDT2This =
+ new HashMap<Byte, FieldType>(FieldType.values().length);
+
+ private String name;
+
+ static {
+ for (FieldType t : FieldType.values()) {
+ mapPigDT2This.put(t.pigDataType(), t);
+ }
+ }
+
+ private FieldType(String name) {
+ this.name = name;
+ }
+
+ public abstract byte pigDataType();
+
+ /**
+ * To get the type based on the name of the type.
+ *
+ * @param name name fo the type
+ *
+ * @return FieldType Enum for the type
+ */
+ public static FieldType getTypeByName(String name) {
+ return FieldType.valueOf(name.toUpperCase());
+ }
+
+ public static FieldType getTypeByPigDataType(byte dt) {
+ return mapPigDT2This.get(dt);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public boolean isNested() {
+ return false;
+ }
+
+ public int getMaxNumsNestedField() {
+ return 0;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // no op, instantiated by the caller
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, name);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Generated By:JavaCC: Do not edit this line. ParseException.java Version 4.1 */
+/* JavaCCOptions:KEEP_LINE_COL=null */
+package org.apache.hadoop.zebra.types;
+
+/**
+ * This exception is thrown when parse errors are encountered.
+ * You can explicitly create objects of this exception type by
+ * calling the method generateParseException in the generated
+ * parser.
+ *
+ * You can modify this class to customize your error reporting
+ * mechanisms so long as you retain the public fields.
+ */
+public class ParseException extends Exception {
+
+ /**
+ * The version identifier for this Serializable class.
+ * Increment only if the <i>serialized</i> form of the
+ * class changes.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * This constructor is used by the method "generateParseException"
+ * in the generated parser. Calling this constructor generates
+ * a new object of this type with the fields "currentToken",
+ * "expectedTokenSequences", and "tokenImage" set.
+ */
+ public ParseException(Token currentTokenVal,
+ int[][] expectedTokenSequencesVal,
+ String[] tokenImageVal
+ )
+ {
+ super(initialise(currentTokenVal, expectedTokenSequencesVal, tokenImageVal));
+ currentToken = currentTokenVal;
+ expectedTokenSequences = expectedTokenSequencesVal;
+ tokenImage = tokenImageVal;
+ }
+
+ /**
+ * The following constructors are for use by you for whatever
+ * purpose you can think of. Constructing the exception in this
+ * manner makes the exception behave in the normal way - i.e., as
+ * documented in the class "Throwable". The fields "errorToken",
+ * "expectedTokenSequences", and "tokenImage" do not contain
+ * relevant information. The JavaCC generated code does not use
+ * these constructors.
+ */
+
+ public ParseException() {
+ super();
+ }
+
+ /** Constructor with message. */
+ public ParseException(String message) {
+ super(message);
+ }
+
+
+ /**
+ * This is the last token that has been consumed successfully. If
+ * this object has been created due to a parse error, the token
+ * followng this token will (therefore) be the first error token.
+ */
+ public Token currentToken;
+
+ /**
+ * Each entry in this array is an array of integers. Each array
+ * of integers represents a sequence of tokens (by their ordinal
+ * values) that is expected at this point of the parse.
+ */
+ public int[][] expectedTokenSequences;
+
+ /**
+ * This is a reference to the "tokenImage" array of the generated
+ * parser within which the parse error occurred. This array is
+ * defined in the generated ...Constants interface.
+ */
+ public String[] tokenImage;
+
+ /**
+ * It uses "currentToken" and "expectedTokenSequences" to generate a parse
+ * error message and returns it. If this object has been created
+ * due to a parse error, and you do not catch it (it gets thrown
+ * from the parser) the correct error message
+ * gets displayed.
+ */
+ private static String initialise(Token currentToken,
+ int[][] expectedTokenSequences,
+ String[] tokenImage) {
+ String eol = System.getProperty("line.separator", "\n");
+ StringBuffer expected = new StringBuffer();
+ int maxSize = 0;
+ for (int i = 0; i < expectedTokenSequences.length; i++) {
+ if (maxSize < expectedTokenSequences[i].length) {
+ maxSize = expectedTokenSequences[i].length;
+ }
+ for (int j = 0; j < expectedTokenSequences[i].length; j++) {
+ expected.append(tokenImage[expectedTokenSequences[i][j]]).append(' ');
+ }
+ if (expectedTokenSequences[i][expectedTokenSequences[i].length - 1] != 0) {
+ expected.append("...");
+ }
+ expected.append(eol).append(" ");
+ }
+ String retval = "Encountered \"";
+ Token tok = currentToken.next;
+ for (int i = 0; i < maxSize; i++) {
+ if (i != 0) retval += " ";
+ if (tok.kind == 0) {
+ retval += tokenImage[0];
+ break;
+ }
+ retval += " " + tokenImage[tok.kind];
+ retval += " \"";
+ retval += add_escapes(tok.image);
+ retval += " \"";
+ tok = tok.next;
+ }
+ retval += "\" at line " + currentToken.next.beginLine + ", column " + currentToken.next.beginColumn;
+ retval += "." + eol;
+ if (expectedTokenSequences.length == 1) {
+ retval += "Was expecting:" + eol + " ";
+ } else {
+ retval += "Was expecting one of:" + eol + " ";
+ }
+ retval += expected.toString();
+ return retval;
+ }
+
+ /**
+ * The end of line string for this machine.
+ */
+ protected String eol = System.getProperty("line.separator", "\n");
+
+ /**
+ * Used to convert raw characters to their escaped version
+ * when these raw version cannot be used as part of an ASCII
+ * string literal.
+ */
+ static String add_escapes(String str) {
+ StringBuffer retval = new StringBuffer();
+ char ch;
+ for (int i = 0; i < str.length(); i++) {
+ switch (str.charAt(i))
+ {
+ case 0 :
+ continue;
+ case '\b':
+ retval.append("\\b");
+ continue;
+ case '\t':
+ retval.append("\\t");
+ continue;
+ case '\n':
+ retval.append("\\n");
+ continue;
+ case '\f':
+ retval.append("\\f");
+ continue;
+ case '\r':
+ retval.append("\\r");
+ continue;
+ case '\"':
+ retval.append("\\\"");
+ continue;
+ case '\'':
+ retval.append("\\\'");
+ continue;
+ case '\\':
+ retval.append("\\\\");
+ continue;
+ default:
+ if ((ch = str.charAt(i)) < 0x20 || ch > 0x7e) {
+ String s = "0000" + Integer.toString(ch, 16);
+ retval.append("\\u" + s.substring(s.length() - 4, s.length()));
+ } else {
+ retval.append(ch);
+ }
+ continue;
+ }
+ }
+ return retval.toString();
+ }
+
+}
+/* JavaCC - OriginalChecksum=90dec5ae3b18277c2181d26949c6c608 (do not edit this line) */
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,1299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import org.apache.pig.data.Tuple;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.StringReader;
+import org.apache.hadoop.io.BytesWritable;
+import java.io.IOException;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * Partition is a column group management class. Its responsibilities include
+ * storing column groups, handle column groups spliting and stitching by
+ * insertions and queries respectively.
+ */
+public class Partition {
+ public enum SplitType {
+ NONE, RECORD, COLLECTION, MAP
+ }
+
+ /*
+ * This class holds the column group information as generated by
+ * TableStorageParser.
+ */
+ class PartitionInfo {
+ private Map<Schema.ColumnSchema, PartitionFieldInfo> fieldMap =
+ new HashMap<Schema.ColumnSchema, PartitionFieldInfo>();
+ private Map<String, HashSet<ColumnMappingEntry>> mColMap =
+ new HashMap<String, HashSet<ColumnMappingEntry>>();
+ private Schema mSchema;
+
+ public PartitionInfo(Schema schema) {
+ mSchema = schema;
+ }
+
+ public Map<String, HashSet<ColumnMappingEntry>> getColMap() {
+ return mColMap;
+ }
+
+ /*
+ * holds a mapping between a column in the table schema and its
+ * corresponding (sub)column group index and the field index inside the
+ * column group
+ */
+ class ColumnMappingEntry implements Comparable<ColumnMappingEntry>{
+ private int cgIndex = -1, fieldIndex = -1;
+ private Schema.ColumnSchema fs;
+ private HashSet<String> keySet;
+
+ public ColumnMappingEntry(int ri, int fi, Schema.ColumnSchema fs) {
+ cgIndex = ri;
+ fieldIndex = fi;
+ this.fs = fs;
+ }
+
+ public ColumnMappingEntry() {
+ }
+
+ /**
+ * add map keys
+ * return false if any key already exists but no rollback!
+ */
+ public boolean addKeys(HashSet<String> keys)
+ {
+ if (keySet == null)
+ keySet = new HashSet<String>();
+ String key;
+ for (Iterator<String> it = keys.iterator(); it.hasNext(); )
+ {
+ key = it.next();
+ if (!keySet.add(key))
+ return false;
+ }
+ return true;
+ }
+
+ public int getCGIndex() {
+ return cgIndex;
+ }
+
+ public int getFieldIndex() {
+ return fieldIndex;
+ }
+
+ public Schema.ColumnSchema getColumnSchema() {
+ return fs;
+ }
+
+ public boolean invalid() {
+ return (cgIndex == -1 && fieldIndex == -1);
+ }
+
+ public int compareTo(ColumnMappingEntry anotherEntry) {
+ int r = anotherEntry.getCGIndex();
+ if (r != this.cgIndex)
+ return this.cgIndex - r;
+ else {
+ int f = anotherEntry.getFieldIndex();
+ return this.fieldIndex -f;
+ }
+ }
+
+ HashSet<String> getKeys() {
+ return keySet;
+ }
+
+
+ }
+
+ /**
+ * This class holds the column group info for a (sub)column which is a unit
+ * in a column group
+ */
+ class PartitionFieldInfo {
+ private HashSet<PartitionInfo.ColumnMappingEntry> mSplitMaps =
+ new HashSet<ColumnMappingEntry>();
+ private HashSet<String> mSplitColumns = new HashSet<String>();
+ private ColumnMappingEntry mCGIndex = null;
+ private String mCGName = null; // fully qualified name
+ private SplitType stype = SplitType.NONE;
+ private boolean splitChild;
+
+ /**
+ * set a MAP key split (sub)column
+ * returns false if sanity check fails
+ */
+ boolean setKeyCGIndex(int ri, int fi, String name, Schema.ColumnSchema fs, HashSet<String> keys) {
+ Partition.PartitionInfo.ColumnMappingEntry cme =
+ new Partition.PartitionInfo.ColumnMappingEntry( ri, fi, fs);
+ mSplitMaps.add(cme);
+ // multiple map splits on one MAP column is allowed!
+ mSplitColumns.add(name);
+ return cme.addKeys(keys);
+ }
+
+ /**
+ * set a record field split (sub)column
+ */
+ boolean setCGIndex(int ri, int fi, String name, Schema.ColumnSchema fs) {
+ if (mCGIndex != null) return false;
+ mCGIndex = new Partition.PartitionInfo.ColumnMappingEntry(ri, fi, fs);
+ mCGName = name;
+ return true;
+ }
+
+ ColumnMappingEntry getCGIndex() {
+ return mCGIndex;
+ }
+
+ String getCGName() {
+ return mCGName;
+ }
+
+ /**
+ * set the split type of a (sub)column
+ */
+ void setSplit(SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
+ if (st == stype)
+ {
+ // multiple MAP splits of a field and its children on different keys are ok
+ if (st == SplitType.MAP || cst == SplitType.MAP || splitChild == this.splitChild)
+ return;
+ }
+ if (stype != SplitType.NONE) {
+ if (childName != null)
+ name = name + "." + childName;
+ throw new ParseException("Different Split Types Set on the same field: " + name);
+ }
+ stype = st;
+ this.splitChild = splitChild;
+ if (mSplitColumns.contains(name)) {
+ if (childName != null)
+ name = name + "." + childName;
+ throw new ParseException("Split on "+name+" are set more than once");
+ }
+ mSplitColumns.add(name);
+ }
+
+ /*
+ * creates a default "catch-all" column schema if necessary
+ */
+ void generateDefaultCGSchema(Schema.ColumnSchema fs0, Schema schema,
+ int defaultCGIndex,
+ Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap, String prefix,
+ Map<Schema.ColumnSchema, PartitionFieldInfo> fmap)
+ throws ParseException {
+ if (schema == null) throw new AssertionError("Interal Logic Error.");
+ if (prefix == null) prefix = fs0.name;
+ else if (fs0.name != null && !fs0.name.isEmpty())
+ prefix += "." + fs0.name;
+ Schema.ColumnSchema fs;
+ for (int i = 0; i < fs0.schema.getNumColumns(); i++) {
+ fs = fs0.schema.getColumn(i);
+ PartitionFieldInfo pi;
+ if ((pi = fmap.get(fs)) == null)
+ fmap.put(fs, pi = new PartitionFieldInfo());
+
+ /*
+ * won't go down for MAP split because only one level MAP split is
+ * supported now
+ */
+ if (pi.stype != SplitType.NONE && pi.stype != SplitType.MAP) {
+ /* go to the lower level */
+ pi.generateDefaultCGSchema(fs, schema, defaultCGIndex, colmap,
+ prefix, fmap);
+ }
+ else if (pi.mCGIndex != null) {
+ HashSet<ColumnMappingEntry> cms = mColMap.get(pi.mCGName);
+ if (cms == null)
+ {
+ cms = new HashSet<ColumnMappingEntry>();
+ colmap.put(pi.mCGName, cms);
+ }
+ cms.add(pi.mCGIndex);
+ if (!pi.mSplitMaps.isEmpty()) {
+ for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
+ it.hasNext(); )
+ {
+ cms.add(it.next());
+ }
+ }
+ } else {
+ HashSet<ColumnMappingEntry> cms = colmap.get(prefix+"."+fs.name);
+ if (cms == null)
+ {
+ cms = new HashSet<ColumnMappingEntry>();
+ colmap.put(prefix+"."+fs.name, cms);
+ }
+ pi.mCGIndex = new ColumnMappingEntry(defaultCGIndex,
+ schema.getNumColumns(), fs);
+ pi.mCGName = prefix+"."+fs.name;
+ cms.add(pi.mCGIndex);
+
+ schema.add(new Schema.ColumnSchema(prefix + "." + fs.name,
+ fs.schema, fs.type));
+ if (!pi.mSplitMaps.isEmpty()) {
+ for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
+ it.hasNext(); )
+ {
+ cms.add(it.next());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * set a MAP key split (sub)column
+ */
+ boolean setKeyCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name, HashSet<String> keys) {
+ PartitionFieldInfo pi;
+ if ((pi = fieldMap.get(fs)) == null) {
+ pi = new PartitionFieldInfo();
+ fieldMap.put(fs, pi);
+ }
+
+ return pi.setKeyCGIndex(ri, fi, name, fs.schema.getColumn(0), keys);
+ }
+
+ /**
+ * set a record field split (sub)column
+ */
+ boolean setCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name) {
+ PartitionFieldInfo pi;
+ if ((pi = fieldMap.get(fs)) == null) {
+ pi = new PartitionFieldInfo();
+ fieldMap.put(fs, pi);
+ }
+ return pi.setCGIndex(ri, fi, name, fs);
+ }
+
+ /**
+ * set the split type of a (sub)column
+ */
+ void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
+ PartitionFieldInfo pi;
+ if ((pi = fieldMap.get(fs)) == null) {
+ pi = new PartitionFieldInfo();
+ fieldMap.put(fs, pi);
+ }
+ pi.setSplit(st, cst, name, childName, splitChild);
+ }
+
+ /*
+ * creates a default "catch-all" CG schema if necessary
+ */
+ public CGSchema generateDefaultCGSchema(String compressor,
+ String serializer, final int defaultCGIndex) throws ParseException {
+ Schema schema = new Schema();
+ Schema.ColumnSchema fs;
+ for (int i = 0; i < mSchema.getNumColumns(); i++) {
+ fs = mSchema.getColumn(i);
+ PartitionFieldInfo pi;
+ if ((pi = fieldMap.get(fs)) == null)
+ fieldMap.put(fs, pi = new PartitionFieldInfo());
+
+ /*
+ * won't go down for MAP split because only one level MAP split is
+ * supported now
+ */
+ if (pi.stype != SplitType.NONE && pi.stype != SplitType.MAP)
+ {
+ /* go to the lower level */
+ pi.generateDefaultCGSchema(fs, schema, defaultCGIndex, mColMap, null,
+ fieldMap);
+ } else if (pi.mCGIndex != null) {
+ HashSet<ColumnMappingEntry> cms = mColMap.get(pi.mCGName);
+ if (cms == null)
+ {
+ cms = new HashSet<ColumnMappingEntry>();
+ mColMap.put(pi.mCGName, cms);
+ }
+ cms.add(pi.mCGIndex);
+ }
+ else {
+ HashSet<ColumnMappingEntry> cms = mColMap.get(fs.name);
+ if (cms == null)
+ {
+ cms = new HashSet<ColumnMappingEntry>();
+ mColMap.put(fs.name, cms);
+ }
+ ColumnMappingEntry cme = new ColumnMappingEntry(defaultCGIndex,
+ schema.getNumColumns(), fs);
+ cms.add(cme);
+ setCGIndex(fs, defaultCGIndex, schema.getNumColumns(), fs.name);
+ schema.add(fs);
+ for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
+ it.hasNext(); )
+ {
+ cms.add(it.next());
+ }
+ }
+ }
+ CGSchema defaultSchema =
+ (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false,
+ serializer, compressor));
+ return defaultSchema;
+ }
+
+ /**
+ * returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
+ * aross different hash keys
+ */
+ public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
+ Schema.ColumnSchema fs) {
+ PartitionFieldInfo pi;
+ if ((pi = mPartitionInfo.fieldMap.get(fs)) == null) {
+ pi = new PartitionFieldInfo();
+ mPartitionInfo.fieldMap.put(fs, pi);
+ }
+ return pi.mSplitMaps;
+ }
+ }
+
+ /*
+ * This class records all column groups required and their corresponding users
+ * of tuples and their projections in turn.
+ */
+ private class CGEntry {
+ /* reference to needing partitioned columns */
+ private ArrayList<PartitionedColumn> mSources = null; // reference to
+ // needing partitioned
+ // columns
+ private ArrayList<String> mProjections = null; // projections
+ private int mSize = 0; // number of users
+ private int mCGIndex = -1;
+ private Tuple mTuple = null;
+ // map of a projection index to a set of interested hash keys
+ private HashMap<Integer, HashSet<String>> mKeyMap;
+
+ CGEntry(int cgindex) {
+ mCGIndex = cgindex;
+ mSources = new ArrayList<PartitionedColumn>();
+ mProjections = new ArrayList<String>();
+ }
+
+ void addUser(Partition.PartitionedColumn src, String projection) {
+ mSources.add(src);
+ mProjections.add(projection);
+ src.setProjIndex(mSize++);
+ }
+
+ void addUser(Partition.PartitionedColumn src, String projection, HashSet<String> keys) {
+ mSources.add(src);
+ mProjections.add(projection);
+ if (mKeyMap == null) {
+ mKeyMap = new HashMap<Integer, HashSet<String>>();
+ }
+ mKeyMap.put(mSize, keys);
+ src.setProjIndex(mSize++);
+ }
+
+ void insert(final BytesWritable key) throws ExecException {
+ for (int i = 0; i < mSize; i++)
+ ((Tuple) mTuple).set(mSources.get(i).getProjIndex(), mSources.get(i)
+ .getRecord());
+ }
+
+ void read() throws ExecException {
+ for (int i = 0; i < mSize; i++)
+ mSources.get(i).setRecord(mTuple.get(mSources.get(i).getProjIndex()));
+ }
+
+ void setSource(Tuple tuple) {
+ mTuple = tuple;
+ }
+
+ int getCGIndex() {
+ return mCGIndex;
+ }
+
+ /**
+ * return untyped projection schema
+ */
+ String getProjection() throws ParseException {
+ String result = new String();
+ HashSet<String> keySet;
+ for (int i = 0; i < mProjections.size(); i++) {
+ if (i > 0) result += ",";
+ result += mProjections.get(i);
+ if (mKeyMap != null && (keySet = mKeyMap.get(i)) != null)
+ {
+ if (keySet.isEmpty())
+ throw new AssertionError(
+ "Internal Logical Error: Empty key map.");
+ result += "#{";
+ int j = 0;
+ for (Iterator<String> it = keySet.iterator(); it.hasNext(); j++)
+ {
+ if (j > 0)
+ result += "|";
+ result += it.next();
+ }
+ result += "}";
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
+ * stitch and split execution class
+ */
+ private class PartitionedColumn {
+ private ArrayList<PartitionedColumn> mChildren = null; // partitioned
+ // children
+ private int mChildrenLen = 0;
+ private int mFieldIndex = -1; // field index in parent
+ private int mProjIndex = -1; // field index in CG projection: only used by
+ // a leaf
+ private Partition.SplitType mSplitType = Partition.SplitType.NONE;
+ private Object mTuple = null;
+ private boolean mNeedTmpTuple;
+ private HashSet<String> mKeys; // interested hash keys
+
+ PartitionedColumn(int fi, boolean needTmpTuple)
+ throws IOException {
+ mFieldIndex = fi;
+ mNeedTmpTuple = needTmpTuple;
+ }
+
+ PartitionedColumn(int fi, Partition.SplitType st,
+ boolean needTmpTuple) throws IOException {
+ this(fi, needTmpTuple);
+ mSplitType = st;
+ }
+
+ void setKeys(HashSet<String> keys) {
+ mKeys = keys;
+ }
+
+ /**
+ * stitch op
+ */
+ @SuppressWarnings("unchecked")
+ void stitch() throws IOException {
+ PartitionedColumn child;
+ if (mSplitType == Partition.SplitType.NONE) {
+ for (int i = 0; i < mChildrenLen; i++) {
+ child = mChildren.get(i);
+ ((Tuple) mTuple).set(child.mFieldIndex, child.getRecord());
+ }
+ }
+ else {
+ // stitch MAP-key partitioned hashes
+ for (int i = 0; i < mChildrenLen; i++) {
+ child = mChildren.get(i);
+ // add the new (key,value) to the existing map
+ ((Map<String, Object>) mTuple)
+ .putAll((Map<String, Object>)child.getRecord());
+ }
+ }
+ }
+
+ /**
+ * split op
+ */
+ @SuppressWarnings("unchecked")
+ void split() throws IOException {
+ PartitionedColumn child;
+ if (mSplitType == SplitType.NONE) {
+ // record split
+ for (int i = 0; i < mChildrenLen; i++) {
+ child = mChildren.get(i);
+ child.setRecord(((Tuple) mTuple).get(child.mFieldIndex));
+ }
+ }
+ else {
+ // split MAP columns excluding the hashes already in split key CGs
+ String key;
+
+ // make a clone so the input MAP is intact after keys are yanked
+ Object newmap = ((HashMap<String, Object>) mTuple).clone();
+ mTuple = newmap;
+ Map<String, Object> map_column =
+ (Map<String, Object>) (mTuple);
+ Map<String, Object> childMap;
+ Object value;
+ for (int i = 0; i < mChildrenLen; i++) {
+ child = mChildren.get(i);
+ childMap = (Map<String, Object>) child.getRecord();
+ for (Iterator<String> it = child.mKeys.iterator(); it.hasNext();)
+ {
+ key = it.next();
+ if ((value = map_column.get(key)) != null)
+ {
+ childMap.put(key, value);
+ map_column.remove(key);
+ }
+ }
+ }
+ }
+ }
+
+ Object getRecord() {
+ return mTuple;
+ }
+
+ void setRecord(Object t) {
+ mTuple = t;
+ }
+
+ void addChild(PartitionedColumn child) {
+ if (mChildren == null) mChildren = new ArrayList<PartitionedColumn>();
+ mChildren.add(child);
+ mChildrenLen++;
+ }
+
+ void setProjIndex(int projindex) {
+ mProjIndex = projindex;
+ }
+
+ int getProjIndex() {
+ return mProjIndex;
+ }
+
+ void createTmpTuple() throws IOException {
+ if (mNeedTmpTuple)
+ {
+ int size = (mChildrenLen > 0 ? mChildrenLen : 1);
+ mTuple = TypesUtils.createTuple(size);
+ }
+ }
+
+ /**
+ * create maps if necessary
+ */
+ void createMap()
+ {
+ mTuple = new HashMap<String, Object>();
+ }
+
+ /**
+ * clear map
+ */
+ @SuppressWarnings("unchecked")
+ void clearMap()
+ {
+ ((Map)mTuple).clear();
+ }
+ }
+
+ private HashMap<Integer, CGEntry> mCGs = null; // involved CGs
+ private ArrayList<PartitionedColumn> mExecs = null; // stitches to be
+ // performed in sequence:
+ // called by LOAD
+ private int mStitchSize = 0; // number of the stitches
+ private int mSplitSize = 0; // number of the splits
+ private Schema mSchema = null;
+ private CGSchema[] mCGSchemas;
+ private PartitionInfo mPartitionInfo;
+ private Projection mProjection = null;
+ private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
+ private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
+
+ /*
+ * ctor used for LOAD
+ */
+ public Partition(Schema schema, Projection projection, String storage)
+ throws ParseException, IOException {
+ mSchema = schema;
+ TableStorageParser sparser =
+ new TableStorageParser(new StringReader(storage), this, mSchema);
+ mPartitionInfo = new PartitionInfo(schema);
+ ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+ mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
+ mProjection = projection;
+ Schema projSchema = projection.getProjectionSchema();
+ int size = projSchema.getNumColumns();
+ HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices;
+ PartitionInfo.ColumnMappingEntry cgindex;
+ mCGs = new HashMap<Integer, CGEntry>();
+ Schema.ColumnSchema fs;
+ CGEntry cgentry;
+ Schema.ParsedName pname = new Schema.ParsedName();
+ mExecs = new ArrayList<PartitionedColumn>();
+ PartitionedColumn parCol, curCol = new PartitionedColumn(-1, false); // top
+ // level:
+ // target
+ // of
+ // stitch
+ String name;
+ HashSet<String> projectedKeys;
+ for (int i = 0; i < size; i++) // depth-first
+ {
+ if (projSchema.getColumn(i) == null)
+ continue;
+ name = projSchema.getColumn(i).name;
+ pname.setName(name);
+ projectedKeys = (projection.getKeys() == null ? null :
+ projection.getKeys().get(projSchema.getColumn(i)));
+ cgindices = getColMapping(schema, name, pname, projectedKeys);
+ if (cgindices != null) {
+ // either needs split of a CG column or the projection is a CG proper
+ fs = schema.getColumnSchema(name);
+ if (getSplitMap(fs).isEmpty()) {
+ if (cgindices.size() != 1)
+ throw new AssertionError( "Internal Logical Error: one RECORD split is expected.");
+ Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
+ Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
+ mapentry = entrySet.iterator().next();
+ cgindex = mapentry.getKey();
+ if (cgindex == null)
+ throw new AssertionError( "Internal Logical Error: RECORD does not have a CG index.");
+ if (mapentry.getValue() != null)
+ throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
+ cgentry = getCGEntry(cgindex.getCGIndex());
+ parCol = new PartitionedColumn(i, true);
+ mPCNeedTmpTuple.add(parCol);
+ cgentry.addUser(parCol, name);
+ curCol.addChild(parCol);
+ } else {
+ // leaves are not added to exec!
+ if (!getSplitMap(fs).isEmpty()) {
+ // this subtype is MAP-split
+ handleMapStitch(pname, curCol, fs, i,
+ getCGIndex(fs).getFieldIndex(), cgindices);
+ }
+ }
+ }
+ else {
+ // a composite column of CGs
+ fs = schema.getColumnSchema(name);
+ if (fs == null) continue;
+ parCol = new PartitionedColumn(i, true);
+ mPCNeedTmpTuple.add(parCol);
+ buildStitch(fs, pname, parCol); // depth-first
+ mExecs.add(parCol);
+ mStitchSize++;
+ curCol.addChild(parCol);
+ }
+ }
+ for (int i = 0; i < mPCNeedTmpTuple.size(); i++)
+ mPCNeedTmpTuple.get(i).createTmpTuple();
+ mExecs.add(curCol);
+ mStitchSize++;
+ }
+
+ /*
+ * ctor used by STORE
+ */
+ public Partition(final String schema, final String storage)
+ throws ParseException, IOException
+ {
+ TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
+ mSchema = parser.RecordSchema(null);
+ mPartitionInfo = new PartitionInfo(mSchema);
+ TableStorageParser sparser =
+ new TableStorageParser(new StringReader(storage), this, mSchema);
+ ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+ mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
+ int size = mSchema.getNumColumns();
+ PartitionInfo.ColumnMappingEntry cgindex;
+ mCGs = new HashMap<Integer, CGEntry>();
+ CGEntry cgentry;
+ mExecs = new ArrayList<PartitionedColumn>();
+ PartitionedColumn parCol, curCol = new PartitionedColumn(-1, false); // top
+ // level:
+ // target
+ // of
+ // stitch
+ mExecs.add(curCol); // breadth-first
+ mSplitSize++;
+ Schema.ColumnSchema fs;
+ for (int i = 0; i < size; i++) {
+ fs = mSchema.getColumn(i);
+ cgindex = getCGIndex(fs);
+ if (cgindex != null) {
+ // a CG field
+ cgentry = getCGEntry(cgindex.getCGIndex());
+ // leaves are not added to exec!
+ if (getSplitMap(fs).isEmpty()) {
+ parCol = new PartitionedColumn(i, false);
+ cgentry.addUser(parCol, null); // null mean all schema
+ parCol.setProjIndex(cgindex.getFieldIndex());
+ curCol.addChild(parCol);
+ } else {
+ // this subtype is MAP-split
+ // => need to add splits for all split keys
+ handleMapSplit(curCol, fs, i, cgentry);
+ }
+ }
+ else {
+ // a composite column of CGs
+ parCol = new PartitionedColumn(i, false);
+ mExecs.add(parCol); // breadth-first
+ mSplitSize++;
+ buildSplit(fs, parCol);
+ curCol.addChild(parCol);
+ }
+ }
+
+ for (int i = 0; i < mPCNeedTmpTuple.size(); i++)
+ mPCNeedTmpTuple.get(i).createTmpTuple();
+
+ for (int i = 0; i < mPCNeedMap.size(); i++)
+ mPCNeedMap.get(i).createMap();
+ }
+
+ /**
+ * returns table schema
+ */
+ public Schema getSchema() {
+ return mSchema;
+ }
+
+ /*
+ * returns the partition info created by the parser
+ */
+ public PartitionInfo getPartitionInfo() {
+ return mPartitionInfo;
+ }
+
+ /*
+ * returns all column group schemas
+ */
+ public CGSchema[] getCGSchemas() {
+ return mCGSchemas;
+ }
+
+ /*
+ * returns a particular column group schemas
+ */
+ public CGSchema getCGSchema(int index) {
+ if (mCGSchemas == null) return null;
+ return mCGSchemas[index];
+ }
+
+ /*
+ * search from the most specific name until the least specific: if none found
+ * return null; In addition, the fq name portion after the matched CG's fq
+ * name is returned in pn. For MAP split columns, it returns the catch-all
+ * CG not the CGs with specific keys if this is not a key projection; otherwise
+ * a map of CG fields to projected keys is returned
+ */
+ private HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> getColMapping(Schema schema,
+ String name, Schema.ParsedName pn, HashSet<String> projectedKeys) throws ParseException {
+ Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap =
+ mPartitionInfo.mColMap;
+ int fromIndex = name.length() - 1, lastHashIndex, lastFieldIndex;
+
+ HashSet<PartitionInfo.ColumnMappingEntry> results = colmap.get(name);
+ HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> result = null;
+ HashSet<String> keys;
+ String ancestorName;
+ boolean map = false;
+ if (results != null)
+ {
+ if (projectedKeys != null)
+ {
+ pn.mDT = ColumnType.MAP;
+ map = true;
+ } else {
+ pn.mDT = ColumnType.ANY;
+ PartitionInfo.ColumnMappingEntry cme;
+ for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
+ {
+ cme = it.next();
+ if (cme.getKeys() == null)
+ {
+ // no specific keys are interested. Either a MAP-split without key or a RECORD-split is OK
+ result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+ result.put(cme, null);
+ return result;
+ }
+ }
+ return null;
+ }
+ }
+
+ while (results == null) {
+ lastHashIndex = name.lastIndexOf('#', fromIndex);
+ lastFieldIndex = name.lastIndexOf('.', fromIndex);
+ if (lastHashIndex == -1 && lastFieldIndex == -1) break;
+ else if (lastHashIndex == -1) {
+ fromIndex = lastFieldIndex;
+ }
+ else if (lastFieldIndex == -1) {
+ fromIndex = lastHashIndex;
+ map = true;
+ }
+ else {
+ if (lastHashIndex == lastFieldIndex - 1
+ || lastFieldIndex == lastHashIndex - 1) break;
+ fromIndex =
+ (lastFieldIndex > lastHashIndex ? lastFieldIndex : lastHashIndex);
+ }
+ if (fromIndex <= 0) break;
+ ancestorName = name.substring(0, fromIndex);
+ results = colmap.get(ancestorName);
+ fromIndex--;
+ }
+ if (results != null) {
+ if (map)
+ {
+ // build a HashMap from ColumnGroupMappingEntry to a set of projected keys for MAP-split
+ if (results.isEmpty())
+ throw new AssertionError( "Internal Logical Error: split is expected.");
+ PartitionInfo.ColumnMappingEntry thisCG, defaultCG = null;
+ boolean found;
+ String projectedKey;
+ for (Iterator<String> projectedKeyIt = projectedKeys.iterator(); projectedKeyIt.hasNext();)
+ {
+ projectedKey = projectedKeyIt.next();
+ found = false;
+ thisCG = null;
+ for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator();
+ it.hasNext(); )
+ {
+ thisCG = it.next();
+ keys = thisCG.getKeys();
+ if (keys == null)
+ {
+ defaultCG = thisCG;
+ } else {
+ if (keys.contains(projectedKey))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+
+ if (!found)
+ {
+ if (defaultCG == null)
+ throw new AssertionError( "Internal Logical Error: default MAP split CG is missing.");
+
+ thisCG =defaultCG;
+ }
+ if (result == null)
+ result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+ if ((keys = result.get(thisCG)) == null)
+ {
+ keys = new HashSet<String>();
+ result.put(thisCG, keys);
+ }
+ keys.add(projectedKey);
+ }
+ if (result == null)
+ if (results.isEmpty())
+ throw new AssertionError( "Internal Logical Error: Default MAP split column is missing.");
+ } else {
+ // a RECORD-split
+ if (results.size() != 1)
+ throw new AssertionError(
+ "Internal Logical Error: A single split is expected.");
+ result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+ result.put(results.iterator().next(), null);
+ return result;
+ }
+ // discard the CG fq name
+ fromIndex += 2;
+ if (!map)
+ {
+ // no need to crawl down after encountering a MAP-split subcolumn
+ pn.setName(name.substring(fromIndex));
+ pn.parseName(results.iterator().next().getColumnSchema());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * recursively build stitch executions
+ * @throws IOException
+ */
+ private void buildStitch(Schema.ColumnSchema fs, Schema.ParsedName pn,
+ PartitionedColumn parent) throws ParseException, IOException {
+ // depth-first traversal
+ CGEntry cgentry = null;
+ PartitionedColumn parCol;
+
+ if (fs.schema != null && fs.schema.getNumColumns() > 0) {
+ Schema.ColumnSchema child;
+ pn.parseName(fs);
+ Schema.ParsedName oripn = new Schema.ParsedName();
+ for (int i = 0; i < fs.schema.getNumColumns(); i++) {
+ oripn.setName(new String(pn.mName), pn.mDT);
+ child = fs.schema.getColumn(i);
+ if (getCGIndex(child) == null) {
+ // not a CG: go one level lower
+ parCol = new PartitionedColumn(i, true);
+ mPCNeedTmpTuple.add(parCol);
+ buildStitch(child, oripn, parCol); // depth-first
+ mExecs.add(parCol);
+ mStitchSize++;
+ parent.addChild(parCol);
+ }
+ else {
+ if (getSplitMap(child).isEmpty()) {
+ // this subtype is not MAP-split
+ cgentry = getCGEntry(getCGIndex(child).getCGIndex());
+ parCol = new PartitionedColumn(i, true);
+ mPCNeedTmpTuple.add(parCol);
+ cgentry.addUser(parCol, getCGName(child));
+ parent.addChild(parCol);
+ }
+ else {
+ // this subtype is MAP-split
+ Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap =
+ mPartitionInfo.mColMap;
+ HashSet<PartitionInfo.ColumnMappingEntry> msplits = colmap.get(getCGName(child));
+ HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices;
+ cgindices = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+ PartitionInfo.ColumnMappingEntry cme;
+ for (Iterator<PartitionInfo.ColumnMappingEntry> it = msplits.iterator(); it.hasNext(); )
+ {
+ cme = it.next();
+
+ // all keys must be included
+ if (cme.getKeys() == null)
+ cgindices.put(cme, cme.getKeys());
+ }
+ handleMapStitch(oripn, parent, child, i, getCGIndex(
+ child).getFieldIndex(), cgindices);
+ }
+ }
+ }
+ }
+ else throw new AssertionError(
+ "Internal Logical Error: Leaf type must have a CG ancestor or is CG itself.");
+ }
+
+ /**
+ * build stitches on a MAP split (sub)column
+ * @throws IOException
+ */
+ private void handleMapStitch(Schema.ParsedName pn,
+ PartitionedColumn parent, Schema.ColumnSchema child, int i,
+ int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
+ CGEntry cgentry;
+ if (pn.mDT == ColumnType.ANY) {
+ // this subtype is MAP split and the projection is on the whole MAP:
+ // => need to add stitches for all split keys
+
+ // first the map partitioned column that contain all non-key-partitioned
+ // hashes
+ if (cgindices == null || cgindices.size() != 1) {
+ throw new AssertionError(
+ "Internal Logical Error: Invalid map key size.");
+ }
+
+ Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
+ Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
+ mapentry = entrySet.iterator().next();
+ if (mapentry.getValue() != null)
+ throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
+
+ cgentry = getCGEntry(getCGIndex(child).getCGIndex());
+
+ PartitionedColumn mapParCol =
+ new PartitionedColumn(i, Partition.SplitType.MAP, true);
+ mPCNeedTmpTuple.add(mapParCol);
+ cgentry.addUser(mapParCol, getCGName(child));
+ mExecs.add(mapParCol); // not a leaf : MAP stitch needed
+ mStitchSize++;
+ parent.addChild(mapParCol);
+
+ // the map-key partitioned columns:
+ HashSet<PartitionInfo.ColumnMappingEntry> splitMap =
+ getSplitMap(child);
+ PartitionInfo.ColumnMappingEntry cgindex;
+ int index;
+ HashSet<Integer> projectedCGs = new HashSet<Integer>();
+ for (Iterator<PartitionInfo.ColumnMappingEntry> it = splitMap.iterator(); it.hasNext();) {
+ cgindex = it.next();
+ index = cgindex.getCGIndex();
+ cgentry = getCGEntry(index);
+ // if the CG is already included in this sub-column stitch, then no need
+ // to add it as a separate stitch since all keys therein is already included
+ if (!projectedCGs.contains(index))
+ {
+ PartitionedColumn parCol =
+ new PartitionedColumn(0, true);
+ mPCNeedTmpTuple.add(parCol);
+ cgentry.addUser(parCol, getCGName(child), cgindex.getKeys());
+ mapParCol.addChild(parCol); // contribute to the non-key-partitioned
+ // hashes
+ mPCNeedMap.add(parCol);
+ projectedCGs.add(index);
+ }
+ }
+ }
+ else {
+ // this sub-type is MAP split and the projection is on another key which
+ // is not a split key:
+ // => need to add a specific key stitch
+ if (cgindices == null) {
+ throw new AssertionError(
+ "Internal Logical Error: MAP key set is empty.");
+ }
+
+ Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
+ HashSet<String> projectedKeys;
+ Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
+ boolean needParent = (entrySet.size() > 1);
+ boolean newParent = false;
+ PartitionedColumn parCol;
+ for (Iterator<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> it = entrySet.iterator(); it.hasNext(); )
+ {
+ mapentry = it.next();
+ projectedKeys = mapentry.getValue();
+ cgentry = getCGEntry(mapentry.getKey().getCGIndex());
+ if (needParent)
+ {
+ parCol = new PartitionedColumn(i, Partition.SplitType.MAP, true);
+ mExecs.add(parCol); // not a leaf : MAP stitch needed
+ mStitchSize++;
+ mPCNeedMap.add(parCol);
+ parent.addChild(parCol);
+ parent = parCol;
+ needParent = false;
+ newParent = true;
+ } else {
+ parCol = new PartitionedColumn(newParent ? 0 : i, true);
+ parent.addChild(parCol);
+ }
+ mPCNeedTmpTuple.add(parCol);
+ cgentry.addUser(parCol, getCGName(child), projectedKeys);
+ }
+ }
+ }
+
+ private CGEntry getCGEntry(int cgindex)
+ {
+ CGEntry result = mCGs.get(cgindex);
+ if (result == null)
+ {
+ result = new CGEntry(cgindex);
+ mCGs.put(cgindex, result);
+ }
+ return result;
+ }
+
+ /**
+ * recursively build split executions
+ * @throws IOException
+ */
+ private void buildSplit(Schema.ColumnSchema fs, PartitionedColumn parent)
+ throws ParseException, IOException {
+ // depth-first traversal
+ CGEntry cgentry;
+ PartitionedColumn parCol;
+
+ if (fs.schema != null && fs.schema.getNumColumns() > 0) {
+ Schema.ColumnSchema child;
+ for (int i = 0; i < fs.schema.getNumColumns(); i++) {
+ child = fs.schema.getColumn(i);
+ PartitionInfo.ColumnMappingEntry cgindex = getCGIndex(child);
+ if (cgindex == null) {
+ // not a CG: go one level lower
+ parCol = new PartitionedColumn(i, false);
+ mExecs.add(parCol); // breadth-first
+ mSplitSize++;
+ parent.addChild(parCol);
+ buildSplit(child, parCol);
+ }
+ else {
+ cgentry = getCGEntry(cgindex.getCGIndex());
+ if (getSplitMap(child).isEmpty()) {
+ // this subfield is not MAP-split
+ parCol = new PartitionedColumn(i, false);
+ cgentry.addUser(parCol, getCGName(child));
+ parCol.setProjIndex(cgindex.getFieldIndex());
+ parent.addChild(parCol);
+
+ }
+ else {
+ // this subfield is MAP-split
+ // => need to add splits for all split keys
+ handleMapSplit(parent, child, i, cgentry);
+ }
+ }
+ }
+ }
+ else throw new AssertionError(
+ "Internal Logical Error: Leaf type must have a CG ancestor or is CG itself.");
+ }
+
+ /**
+ * build splits for MAP split (sub)columns
+ *
+ * @throws IOException
+ */
+ private void handleMapSplit(PartitionedColumn parent,
+ Schema.ColumnSchema child, int i, CGEntry cgentry) throws ParseException, IOException {
+ // first the map partitioned column that contain all non-key-partitioned
+ // hashes
+ PartitionedColumn mapParCol =
+ new PartitionedColumn(i, Partition.SplitType.MAP, false);
+ cgentry.addUser(mapParCol, getCGName(child));
+ mExecs.add(mapParCol); // not a leaf : MAP split needed
+ mSplitSize++;
+ parent.addChild(mapParCol);
+
+ // the map-key partitioned columns:
+ HashSet<PartitionInfo.ColumnMappingEntry> splitMaps = getSplitMap(child);
+ PartitionInfo.ColumnMappingEntry cgindex;
+ int index;
+ HashSet<String> keySet;
+ for (Iterator<PartitionInfo.ColumnMappingEntry> it = splitMaps.iterator(); it.hasNext();) {
+ cgindex = it.next();
+ index = cgindex.getCGIndex();
+ cgentry = getCGEntry(index);
+ keySet = cgindex.getKeys();
+ PartitionedColumn parCol;
+ parCol = new PartitionedColumn(0, false);
+ parCol.setKeys(keySet);
+ cgentry.addUser(parCol, getCGName(child), keySet);
+ parCol.setProjIndex(cgindex.getFieldIndex());
+ // contribute to the non-key-partitioned hashes
+ mapParCol.addChild(parCol);
+ mPCNeedMap.add(parCol); // children needs the tmp map
+ }
+ }
+
+ /**
+ * read in a tuple based on stitches
+ */
+ public void read(Tuple t) throws AssertionError, IOException, Exception {
+ if (mStitchSize == 0 || mCGs == null || mCGs.isEmpty())
+ return;
+
+ // dispatch
+ mExecs.get(mStitchSize - 1).setRecord(t);
+
+ // read in CG data
+ Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+ Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+ while (it.hasNext())
+ it.next().getValue().read();
+
+ TypesUtils.resetTuple(t);
+ // dispatch
+ mExecs.get(mStitchSize - 1).setRecord(t);
+
+ // start the stitch
+ for (int i = 0; i < mStitchSize; i++)
+ mExecs.get(i).stitch();
+
+ return;
+ }
+
+ /**
+ * insert a tuple after splits
+ */
+ public void insert(final BytesWritable key, final Tuple t)
+ throws AssertionError, IOException, Exception {
+ if (mSplitSize == 0 || mCGs == null || mCGs.isEmpty())
+ throw new AssertionError("Empty Column Group List!");
+
+ // dispatch
+ mExecs.get(0).setRecord(t);
+ for (int i = 0; i < mPCNeedMap.size(); i++)
+ mPCNeedMap.get(i).clearMap();
+
+ for (int i = 0; i < mSplitSize; i++)
+ mExecs.get(i).split();
+
+ // insert CG data
+ Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+ Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+ while (it.hasNext())
+ it.next().getValue().insert(key);
+ return;
+ }
+
+ /**
+ * sets the source tuple for the column group ops
+ */
+ public void setSource(Tuple[] tuples) throws ParseException {
+ if (tuples.length < mCGs.size())
+ throw new ParseException(
+ "Internal Logical Error: Invalid number of column groups");
+ for (int i = 0; i < tuples.length; i++) {
+ if (mCGs.get(i) != null) mCGs.get(i).setSource(tuples[i]);
+ }
+ }
+
+ /**
+ * returns projection schema for a particular column group
+ */
+ public String getProjection(int cgindex) throws ParseException {
+ CGEntry cgentry = mCGs.get(cgindex);
+ if (cgentry != null) return cgentry.getProjection();
+ return null;
+ }
+
+ /**
+ * returns table projection
+ */
+ public Projection getProjection() {
+ return mProjection;
+ }
+
+ public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
+ Schema.ColumnSchema fs) {
+ return mPartitionInfo.getSplitMap(fs);
+ }
+
+ public CGSchema generateDefaultCGSchema(String compressor, String serializer,
+ final int defaultCGIndex) throws ParseException {
+ return mPartitionInfo.generateDefaultCGSchema(compressor, serializer,
+ defaultCGIndex);
+ }
+
+ void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
+ mPartitionInfo.setSplit(fs, st, cst, name, childName, splitChild);
+ }
+
+ boolean setCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name) {
+ return mPartitionInfo.setCGIndex(fs, ri, fi, name);
+ }
+
+ PartitionInfo.ColumnMappingEntry getCGIndex(Schema.ColumnSchema fs) {
+ PartitionInfo.PartitionFieldInfo pi;
+ if ((pi = mPartitionInfo.fieldMap.get(fs)) != null) return pi.getCGIndex();
+ return null;
+ }
+
+ /**
+ * @param fs
+ * @return fully qualified name for a CG column
+ */
+ String getCGName(Schema.ColumnSchema fs) {
+ PartitionInfo.PartitionFieldInfo pi;
+ if ((pi = mPartitionInfo.fieldMap.get(fs)) != null) return pi.getCGName();
+ return null;
+ }
+
+ public boolean isCGNeeded(int i)
+ {
+ return mCGs.containsKey(i);
+ }
+}