You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:34 UTC
[37/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
new file mode 100644
index 0000000..15c28c8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+
+/**
+ * Primitive level of the IO structure
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class PrimitiveColumnIO extends ColumnIO {
+// private static final Logger logger = Logger.getLogger(PrimitiveColumnIO.class.getName());
+
+ private ColumnIO[] path;
+ private ColumnDescriptor columnDescriptor;
+ private final int id;
+
+ PrimitiveColumnIO(Type type, GroupColumnIO parent, int index, int id) {
+ super(type, parent, index);
+ this.id = id;
+ }
+
+ @Override
+ void setLevels(int r, int d, String[] fieldPath, int[] fieldIndexPath, List<ColumnIO> repetition, List<ColumnIO> path) {
+ super.setLevels(r, d, fieldPath, fieldIndexPath, repetition, path);
+ PrimitiveType type = getType().asPrimitiveType();
+ this.columnDescriptor = new ColumnDescriptor(
+ fieldPath,
+ type.getPrimitiveTypeName(),
+ type.getTypeLength(),
+ getRepetitionLevel(),
+ getDefinitionLevel());
+ this.path = path.toArray(new ColumnIO[path.size()]);
+ }
+
+ @Override
+ List<String[]> getColumnNames() {
+ return Arrays.asList(new String[][] { getFieldPath() });
+ }
+
+ public ColumnDescriptor getColumnDescriptor() {
+ return columnDescriptor;
+ }
+
+ public ColumnIO[] getPath() {
+ return path;
+ }
+
+ public boolean isLast(int r) {
+ return getLast(r) == this;
+ }
+
+ private PrimitiveColumnIO getLast(int r) {
+ ColumnIO parent = getParent(r);
+
+ PrimitiveColumnIO last = parent.getLast();
+ return last;
+ }
+
+ @Override
+ PrimitiveColumnIO getLast() {
+ return this;
+ }
+
+ @Override
+ PrimitiveColumnIO getFirst() {
+ return this;
+ }
+ public boolean isFirst(int r) {
+ return getFirst(r) == this;
+ }
+
+ private PrimitiveColumnIO getFirst(int r) {
+ ColumnIO parent = getParent(r);
+ return parent.getFirst();
+ }
+
+ public PrimitiveTypeName getPrimitive() {
+ return getType().asPrimitiveType().getPrimitiveTypeName();
+ }
+
+ public int getId() {
+ return id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
new file mode 100644
index 0000000..f753b2a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.Arrays;
+import org.apache.parquet.Log;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+
+/**
+ * This class can be used to wrap an actual RecordConsumer and log all calls
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class RecordConsumerLoggingWrapper extends RecordConsumer {
+ private static final Log logger = Log.getLog(RecordConsumerLoggingWrapper.class);
+ private static final boolean DEBUG = Log.DEBUG;
+
+ private final RecordConsumer delegate;
+
+ int indent = 0;
+
+ /**
+ * all calls a delegate to the wrapped delegate
+ * @param delegate
+ */
+ public RecordConsumerLoggingWrapper(RecordConsumer delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void startField(String field, int index) {
+ if (DEBUG) logOpen(field);
+ delegate.startField(field, index);
+ }
+
+ private void logOpen(String field) {
+ log("<"+field+">");
+ }
+
+ private String indent() {
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < indent; i++) {
+ result.append(" ");
+ }
+ return result.toString();
+ }
+
+ private void log(Object value) {
+ logger.debug(indent() + value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void startGroup() {
+ if (DEBUG) ++indent;
+ if (DEBUG) log("<!-- start group -->");
+ delegate.startGroup();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addInteger(int value) {
+ if (DEBUG) log(value);
+ delegate.addInteger(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addLong(long value) {
+ if (DEBUG) log(value);
+ delegate.addLong(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addBoolean(boolean value) {
+ if (DEBUG) log(value);
+ delegate.addBoolean(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addBinary(Binary value) {
+ if (DEBUG) log(Arrays.toString(value.getBytes()));
+ delegate.addBinary(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addFloat(float value) {
+ if (DEBUG) log(value);
+ delegate.addFloat(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addDouble(double value) {
+ if (DEBUG) log(value);
+ delegate.addDouble(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void endGroup() {
+ if (DEBUG) log("<!-- end group -->");
+ if (DEBUG) --indent;
+ delegate.endGroup();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void endField(String field, int index) {
+ if (DEBUG) logClose(field);
+ delegate.endField(field, index);
+ }
+
+ private void logClose(String field) {
+ log("</"+field+">");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void startMessage() {
+ if (DEBUG) log("<!-- start message -->");
+ delegate.startMessage();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void endMessage() {
+ delegate.endMessage();
+ if (DEBUG) log("<!-- end message -->");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java
new file mode 100644
index 0000000..a9eb2f5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+
+/**
+ * used to read reassembled records
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized record
+ */
+public abstract class RecordReader<T> {
+
+ /**
+ * Reads one record and returns it.
+ * @return the materialized record
+ */
+ public abstract T read();
+
+ /**
+ * Returns whether the current record should be skipped (dropped)
+ * Will be called *after* read()
+ */
+ public boolean shouldSkipCurrentRecord() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
new file mode 100644
index 0000000..7a87cbb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+
+/**
+ * used to read reassembled records
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized record
+ */
+class RecordReaderImplementation<T> extends RecordReader<T> {
+ private static final Log LOG = Log.getLog(RecordReaderImplementation.class);
+
+ public static class Case {
+
+ private int id;
+ private final int startLevel;
+ private final int depth;
+ private final int nextLevel;
+ private final boolean goingUp;
+ private final boolean goingDown;
+ private final int nextState;
+ private final boolean defined;
+
+ public Case(int startLevel, int depth, int nextLevel, int nextState, boolean defined) {
+ this.startLevel = startLevel;
+ this.depth = depth;
+ this.nextLevel = nextLevel;
+ this.nextState = nextState;
+ this.defined = defined;
+ // means going up the tree (towards the leaves) of the record
+ // true if we need to open up groups in this case
+ goingUp = startLevel <= depth;
+ // means going down the tree (towards the root) of the record
+ // true if we need to close groups in this case
+ goingDown = depth + 1 > nextLevel;
+ }
+
+ public void setID(int id) {
+ this.id = id;
+ }
+
+ @Override
+ // this implementation is buggy but the simpler one bellow has duplicates.
+ // it still works but generates more code than necessary
+ // a middle ground is necessary
+// public int hashCode() {
+// int hashCode = 0;
+// if (goingUp) {
+// hashCode += 1 * (1 + startLevel) + 2 * (1 + depth);
+// }
+// if (goingDown) {
+// hashCode += 3 * (1 + depth) + 5 * (1 + nextLevel);
+// }
+// return hashCode;
+// }
+
+ public int hashCode() {
+ int hashCode = 17;
+ hashCode += 31 * startLevel;
+ hashCode += 31 * depth;
+ hashCode += 31 * nextLevel;
+ hashCode += 31 * nextState;
+ hashCode += 31 * (defined ? 0 : 1);
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Case) {
+ return equals((Case)obj);
+ }
+ return false;
+ };
+
+ // see comment for hashCode above
+// public boolean equals(Case other) {
+// if (goingUp && !other.goingUp || !goingUp && other.goingUp) {
+// return false;
+// }
+// if (goingUp && other.goingUp && (startLevel != other.startLevel || depth != other.depth)) {
+// return false;
+// }
+// if (goingDown && !other.goingDown || !goingDown && other.goingDown) {
+// return false;
+// }
+// if (goingDown && other.goingDown && (depth != other.depth || nextLevel != other.nextLevel)) {
+// return false;
+// }
+// return true;
+// }
+
+ public boolean equals(Case other) {
+ return startLevel == other.startLevel
+ && depth == other.depth
+ && nextLevel == other.nextLevel
+ && nextState == other.nextState
+ && ((defined && other.defined) || (!defined && !other.defined));
+ }
+
+ public int getID() {
+ return id;
+ }
+
+ public int getStartLevel() {
+ return startLevel;
+ }
+
+ public int getDepth() {
+ return depth;
+ }
+ public int getNextLevel() {
+ return nextLevel;
+ }
+
+ public int getNextState() {
+ return nextState;
+ }
+
+ public boolean isGoingUp() {
+ return goingUp;
+ }
+
+ public boolean isGoingDown() {
+ return goingDown;
+ }
+
+ public boolean isDefined() {
+ return defined;
+ }
+
+ @Override
+ public String toString() {
+ return "Case " + startLevel + " -> " + depth + " -> " + nextLevel + "; goto sate_"+getNextState();
+ }
+
+ }
+
+ public static class State {
+
+ public final int id;
+ public final PrimitiveColumnIO primitiveColumnIO;
+ public final int maxDefinitionLevel;
+ public final int maxRepetitionLevel;
+ public final PrimitiveTypeName primitive;
+ public final ColumnReader column;
+ public final String[] fieldPath; // indexed by currentLevel
+ public final int[] indexFieldPath; // indexed by currentLevel
+ public final GroupConverter[] groupConverterPath;
+ public final PrimitiveConverter primitiveConverter;
+ public final String primitiveField;
+ public final int primitiveFieldIndex;
+ public final int[] nextLevel; //indexed by next r
+
+ private int[] definitionLevelToDepth; // indexed by current d
+ private State[] nextState; // indexed by next r
+ private Case[][][] caseLookup;
+ private List<Case> definedCases;
+ private List<Case> undefinedCases;
+
+ private State(int id, PrimitiveColumnIO primitiveColumnIO, ColumnReader column, int[] nextLevel, GroupConverter[] groupConverterPath, PrimitiveConverter primitiveConverter) {
+ this.id = id;
+ this.primitiveColumnIO = primitiveColumnIO;
+ this.maxDefinitionLevel = primitiveColumnIO.getDefinitionLevel();
+ this.maxRepetitionLevel = primitiveColumnIO.getRepetitionLevel();
+ this.column = column;
+ this.nextLevel = nextLevel;
+ this.groupConverterPath = groupConverterPath;
+ this.primitiveConverter = primitiveConverter;
+ this.primitive = primitiveColumnIO.getType().asPrimitiveType().getPrimitiveTypeName();
+ this.fieldPath = primitiveColumnIO.getFieldPath();
+ this.primitiveField = fieldPath[fieldPath.length - 1];
+ this.indexFieldPath = primitiveColumnIO.getIndexFieldPath();
+ this.primitiveFieldIndex = indexFieldPath[indexFieldPath.length - 1];
+ }
+
+ public int getDepth(int definitionLevel) {
+ return definitionLevelToDepth[definitionLevel];
+ }
+
+ public List<Case> getDefinedCases() {
+ return definedCases;
+ }
+
+ public List<Case> getUndefinedCases() {
+ return undefinedCases;
+ }
+
+ public Case getCase(int currentLevel, int d, int nextR) {
+ return caseLookup[currentLevel][d][nextR];
+ }
+
+ public State getNextState(int nextR) {
+ return nextState[nextR];
+ }
+ }
+
+ private final GroupConverter recordRootConverter;
+ private final RecordMaterializer<T> recordMaterializer;
+
+ private State[] states;
+ private ColumnReader[] columnReaders;
+
+ private boolean shouldSkipCurrentRecord = false;
+
+ /**
+ * @param root the root of the schema
+ * @param recordMaterializer responsible of materializing the records
+ * @param validating whether we should validate against the schema
+ * @param columnStore where to read the column data from
+ */
+ public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) {
+ this.recordMaterializer = recordMaterializer;
+ this.recordRootConverter = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
+ PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]);
+ columnReaders = new ColumnReader[leaves.length];
+ int[][] nextColumnIdxForRepLevel = new int[leaves.length][];
+ int[][] levelToClose = new int[leaves.length][];
+ GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][];
+ PrimitiveConverter[] leafConverters = new PrimitiveConverter[leaves.length];
+ int[] firstIndexForLevel = new int[256]; // "256 levels of nesting ought to be enough for anybody"
+ // build the automaton
+ for (int i = 0; i < leaves.length; i++) {
+ PrimitiveColumnIO leafColumnIO = leaves[i];
+ //generate converters along the path from root to leaf
+ final int[] indexFieldPath = leafColumnIO.getIndexFieldPath();
+ groupConverterPaths[i] = new GroupConverter[indexFieldPath.length - 1];
+ GroupConverter current = this.recordRootConverter;
+ for (int j = 0; j < indexFieldPath.length - 1; j++) {
+ current = current.getConverter(indexFieldPath[j]).asGroupConverter();
+ groupConverterPaths[i][j] = current;
+ }
+ leafConverters[i] = current.getConverter(indexFieldPath[indexFieldPath.length - 1]).asPrimitiveConverter();
+ columnReaders[i] = columnStore.getColumnReader(leafColumnIO.getColumnDescriptor());
+ int maxRepetitionLevel = leafColumnIO.getRepetitionLevel();
+ nextColumnIdxForRepLevel[i] = new int[maxRepetitionLevel+1];
+
+ levelToClose[i] = new int[maxRepetitionLevel+1]; //next level
+ for (int nextRepLevel = 0; nextRepLevel <= maxRepetitionLevel; ++nextRepLevel) {
+ // remember which is the first for this level
+ if (leafColumnIO.isFirst(nextRepLevel)) {
+ firstIndexForLevel[nextRepLevel] = i;
+ }
+ int nextColIdx;
+ //TODO: when we use nextColumnIdxForRepLevel, should we provide current rep level or the rep level for next item
+ // figure out automaton transition
+ if (nextRepLevel == 0) { // 0 always means jump to the next (the last one being a special case)
+ nextColIdx = i + 1;
+ } else if (leafColumnIO.isLast(nextRepLevel)) { // when we are at the last of the next repetition level we jump back to the first
+ nextColIdx = firstIndexForLevel[nextRepLevel];
+ } else { // otherwise we just go back to the next.
+ nextColIdx = i + 1;
+ }
+ // figure out which level down the tree we need to go back
+ if (nextColIdx == leaves.length) { // reached the end of the record => close all levels
+ levelToClose[i][nextRepLevel] = 0;
+ } else if (leafColumnIO.isLast(nextRepLevel)) { // reached the end of this level => close the repetition level
+ ColumnIO parent = leafColumnIO.getParent(nextRepLevel);
+ levelToClose[i][nextRepLevel] = parent.getFieldPath().length - 1;
+ } else { // otherwise close until the next common parent
+ levelToClose[i][nextRepLevel] = getCommonParentLevel(
+ leafColumnIO.getFieldPath(),
+ leaves[nextColIdx].getFieldPath());
+ }
+ // sanity check: that would be a bug
+ if (levelToClose[i][nextRepLevel] > leaves[i].getFieldPath().length-1) {
+ throw new ParquetEncodingException(Arrays.toString(leaves[i].getFieldPath())+" -("+nextRepLevel+")-> "+levelToClose[i][nextRepLevel]);
+ }
+ nextColumnIdxForRepLevel[i][nextRepLevel] = nextColIdx;
+ }
+ }
+ states = new State[leaves.length];
+ for (int i = 0; i < leaves.length; i++) {
+ states[i] = new State(i, leaves[i], columnReaders[i], levelToClose[i], groupConverterPaths[i], leafConverters[i]);
+
+ int[] definitionLevelToDepth = new int[states[i].primitiveColumnIO.getDefinitionLevel() + 1];
+ // for each possible definition level, determine the depth at which to create groups
+ final ColumnIO[] path = states[i].primitiveColumnIO.getPath();
+ int depth = 0;
+ for (int d = 0; d < definitionLevelToDepth.length; ++d) {
+ while (depth < (states[i].fieldPath.length - 1)
+ && d >= path[depth + 1].getDefinitionLevel()
+ ) {
+ ++ depth;
+ }
+ definitionLevelToDepth[d] = depth - 1;
+ }
+ states[i].definitionLevelToDepth = definitionLevelToDepth;
+ }
+ for (int i = 0; i < leaves.length; i++) {
+ State state = states[i];
+ int[] nextStateIds = nextColumnIdxForRepLevel[i];
+ state.nextState = new State[nextStateIds.length];
+ for (int j = 0; j < nextStateIds.length; j++) {
+ state.nextState[j] = nextStateIds[j] == states.length ? null : states[nextStateIds[j]];
+ }
+ }
+ for (int i = 0; i < states.length; i++) {
+ State state = states[i];
+ final Map<Case, Case> definedCases = new HashMap<Case, Case>();
+ final Map<Case, Case> undefinedCases = new HashMap<Case, Case>();
+ Case[][][] caseLookup = new Case[state.fieldPath.length][][];
+ for (int currentLevel = 0; currentLevel < state.fieldPath.length; ++ currentLevel) {
+ caseLookup[currentLevel] = new Case[state.maxDefinitionLevel+1][];
+ for (int d = 0; d <= state.maxDefinitionLevel; ++ d) {
+ caseLookup[currentLevel][d] = new Case[state.maxRepetitionLevel+1];
+ for (int nextR = 0; nextR <= state.maxRepetitionLevel; ++ nextR) {
+ int caseStartLevel = currentLevel;
+ int caseDepth = Math.max(state.getDepth(d), caseStartLevel - 1);
+ int caseNextLevel = Math.min(state.nextLevel[nextR], caseDepth + 1);
+ Case currentCase = new Case(caseStartLevel, caseDepth, caseNextLevel, getNextReader(state.id, nextR), d == state.maxDefinitionLevel);
+ Map<Case, Case> cases = currentCase.isDefined() ? definedCases : undefinedCases;
+ if (!cases.containsKey(currentCase)) {
+ currentCase.setID(cases.size());
+ cases.put(currentCase, currentCase);
+ } else {
+ currentCase = cases.get(currentCase);
+ }
+ caseLookup[currentLevel][d][nextR] = currentCase;
+ }
+ }
+ }
+ state.caseLookup = caseLookup;
+ state.definedCases = new ArrayList<Case>(definedCases.values());
+ state.undefinedCases = new ArrayList<Case>(undefinedCases.values());
+ Comparator<Case> caseComparator = new Comparator<Case>() {
+ @Override
+ public int compare(Case o1, Case o2) {
+ return o1.id - o2.id;
+ }
+ };
+ Collections.sort(state.definedCases, caseComparator);
+ Collections.sort(state.undefinedCases, caseComparator);
+ }
+ }
+
+ //TODO: have those wrappers for a converter
+ private RecordConsumer validator(RecordConsumer recordConsumer, boolean validating, MessageType schema) {
+ return validating ? new ValidatingRecordConsumer(recordConsumer, schema) : recordConsumer;
+ }
+
+ private RecordConsumer wrap(RecordConsumer recordConsumer) {
+ if (Log.DEBUG) {
+ return new RecordConsumerLoggingWrapper(recordConsumer);
+ }
+ return recordConsumer;
+ }
+
+ /**
+ * @see org.apache.parquet.io.RecordReader#read()
+ */
+ @Override
+ public T read() {
+ int currentLevel = 0;
+ recordRootConverter.start();
+ State currentState = states[0];
+ do {
+ ColumnReader columnReader = currentState.column;
+ int d = columnReader.getCurrentDefinitionLevel();
+ // creating needed nested groups until the current field (opening tags)
+ int depth = currentState.definitionLevelToDepth[d];
+ for (; currentLevel <= depth; ++currentLevel) {
+ currentState.groupConverterPath[currentLevel].start();
+ }
+ // currentLevel = depth + 1 at this point
+ // set the current value
+ if (d >= currentState.maxDefinitionLevel) {
+ // not null
+ columnReader.writeCurrentValueToConverter();
+ }
+ columnReader.consume();
+
+ int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
+ // level to go to close current groups
+ int next = currentState.nextLevel[nextR];
+ for (; currentLevel > next; currentLevel--) {
+ currentState.groupConverterPath[currentLevel - 1].end();
+ }
+
+ currentState = currentState.nextState[nextR];
+ } while (currentState != null);
+ recordRootConverter.end();
+ T record = recordMaterializer.getCurrentRecord();
+ shouldSkipCurrentRecord = record == null;
+ if (shouldSkipCurrentRecord) {
+ recordMaterializer.skipCurrentRecord();
+ }
+ return record;
+ }
+
+ @Override
+ public boolean shouldSkipCurrentRecord() {
+ return shouldSkipCurrentRecord;
+ }
+
+ private static void log(String string) {
+ LOG.debug(string);
+ }
+
+ int getNextReader(int current, int nextRepetitionLevel) {
+ State nextState = states[current].nextState[nextRepetitionLevel];
+ return nextState == null ? states.length : nextState.id;
+ }
+
+ int getNextLevel(int current, int nextRepetitionLevel) {
+ return states[current].nextLevel[nextRepetitionLevel];
+ }
+
+ private int getCommonParentLevel(String[] previous, String[] next) {
+ int i = 0;
+ while (i < Math.min(previous.length, next.length) && previous[i].equals(next[i])) {
+ ++i;
+ }
+ return i;
+ }
+
+ protected int getStateCount() {
+ return states.length;
+ }
+
+ protected State getState(int i) {
+ return states[i];
+ }
+
+ protected RecordMaterializer<T> getMaterializer() {
+ return recordMaterializer;
+ }
+
+ protected Converter getRecordConsumer() {
+ return recordRootConverter;
+ }
+
+ protected Iterable<ColumnReader> getColumnReaders() {
+ // Converting the array to an iterable ensures that the array cannot be altered
+ return Arrays.asList(columnReaders);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
new file mode 100644
index 0000000..e1d3ba7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+
+/**
+ * Wraps a record consumer
+ * Validates the record written against the schema and pass down the event to the wrapped consumer
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ValidatingRecordConsumer extends RecordConsumer {
+ private static final Log LOG = Log.getLog(ValidatingRecordConsumer.class);
+ private static final boolean DEBUG = Log.DEBUG;
+
+ private final RecordConsumer delegate;
+
+ private Deque<Type> types = new ArrayDeque<Type>();
+ private Deque<Integer> fields = new ArrayDeque<Integer>();
+ private Deque<Integer> previousField = new ArrayDeque<Integer>();
+ private Deque<Integer> fieldValueCount = new ArrayDeque<Integer>();
+
+ /**
+ *
+ * @param delegate the consumer to pass down the event to
+ * @param schema the schema to validate against
+ */
+ public ValidatingRecordConsumer(RecordConsumer delegate, MessageType schema) {
+ this.delegate = delegate;
+ this.types.push(schema);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void startMessage() {
+ previousField.push(-1);
+ delegate.startMessage();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void endMessage() {
+ delegate.endMessage();
+ validateMissingFields(types.peek().asGroupType().getFieldCount());
+ previousField.pop();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void startField(String field, int index) {
+ if (index <= previousField.peek()) {
+ throw new InvalidRecordException("fields must be added in order " + field + " index " + index + " is before previous field " + previousField.peek());
+ }
+ validateMissingFields(index);
+ fields.push(index);
+ fieldValueCount.push(0);
+ delegate.startField(field, index);
+ }
+
+ private void validateMissingFields(int index) {
+ for (int i = previousField.peek() + 1; i < index; i++) {
+ Type type = types.peek().asGroupType().getType(i);
+ if (type.isRepetition(Repetition.REQUIRED)) {
+ throw new InvalidRecordException("required field is missing " + type);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void endField(String field, int index) {
+ delegate.endField(field, index);
+ fieldValueCount.pop();
+ previousField.push(fields.pop());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void startGroup() {
+ previousField.push(-1);
+ types.push(types.peek().asGroupType().getType(fields.peek()));
+ delegate.startGroup();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void endGroup() {
+ delegate.endGroup();
+ validateMissingFields(types.peek().asGroupType().getFieldCount());
+ types.pop();
+ previousField.pop();
+ }
+
+ private void validate(PrimitiveTypeName p) {
+ Type currentType = types.peek().asGroupType().getType(fields.peek());
+ int c = fieldValueCount.pop() + 1;
+ fieldValueCount.push(c);
+ if (DEBUG) LOG.debug("validate " + p + " for " + currentType.getName());
+ switch (currentType.getRepetition()) {
+ case OPTIONAL:
+ case REQUIRED:
+ if (c > 1) {
+ throw new InvalidRecordException("repeated value when the type is not repeated in " + currentType);
+ }
+ break;
+ case REPEATED:
+ break;
+ default:
+ throw new InvalidRecordException("unknown repetition " + currentType.getRepetition() + " in " + currentType);
+ }
+ if (!currentType.isPrimitive() || currentType.asPrimitiveType().getPrimitiveTypeName() != p) {
+ throw new InvalidRecordException("expected type " + p + " but got "+ currentType);
+ }
+ }
+
+ private void validate(PrimitiveTypeName... ptypes) {
+ Type currentType = types.peek().asGroupType().getType(fields.peek());
+ int c = fieldValueCount.pop() + 1;
+ fieldValueCount.push(c);
+ if (DEBUG) LOG.debug("validate " + Arrays.toString(ptypes) + " for " + currentType.getName());
+ switch (currentType.getRepetition()) {
+ case OPTIONAL:
+ case REQUIRED:
+ if (c > 1) {
+ throw new InvalidRecordException("repeated value when the type is not repeated in " + currentType);
+ }
+ break;
+ case REPEATED:
+ break;
+ default:
+ throw new InvalidRecordException("unknown repetition " + currentType.getRepetition() + " in " + currentType);
+ }
+ if (!currentType.isPrimitive()) {
+ throw new InvalidRecordException(
+ "expected type in " + Arrays.toString(ptypes) + " but got " + currentType);
+ }
+ for (PrimitiveTypeName p : ptypes) {
+ if (currentType.asPrimitiveType().getPrimitiveTypeName() == p) {
+ return; // type is valid
+ }
+ }
+ throw new InvalidRecordException(
+ "expected type in " + Arrays.toString(ptypes) + " but got " + currentType);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addInteger(int value) {
+ validate(INT32);
+ delegate.addInteger(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addLong(long value) {
+ validate(INT64);
+ delegate.addLong(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addBoolean(boolean value) {
+ validate(BOOLEAN);
+ delegate.addBoolean(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addBinary(Binary value) {
+ validate(BINARY, INT96, FIXED_LEN_BYTE_ARRAY);
+ delegate.addBinary(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addFloat(float value) {
+ validate(FLOAT);
+ delegate.addFloat(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addDouble(double value) {
+ validate(DOUBLE);
+ delegate.addDouble(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
new file mode 100644
index 0000000..d3abec5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.parquet.io.ParquetEncodingException;
+
+import static org.apache.parquet.bytes.BytesUtils.UTF8;
+
+abstract public class Binary implements Comparable<Binary>, Serializable {
+
+ // this isn't really something others should extend
+ private Binary() { }
+
+ public static final Binary EMPTY = fromByteArray(new byte[0]);
+
+ abstract public String toStringUsingUTF8();
+
+ abstract public int length();
+
+ abstract public void writeTo(OutputStream out) throws IOException;
+
+ abstract public void writeTo(DataOutput out) throws IOException;
+
+ abstract public byte[] getBytes();
+
+ abstract boolean equals(byte[] bytes, int offset, int length);
+
+ abstract boolean equals(Binary other);
+
+ abstract public int compareTo(Binary other);
+
+ abstract int compareTo(byte[] bytes, int offset, int length);
+
+ abstract public ByteBuffer toByteBuffer();
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj instanceof Binary) {
+ return equals((Binary)obj);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
+ }
+
+ private static class ByteArraySliceBackedBinary extends Binary {
+ private final byte[] value;
+ private final int offset;
+ private final int length;
+
+ public ByteArraySliceBackedBinary(byte[] value, int offset, int length) {
+ this.value = value;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public String toStringUsingUTF8() {
+ return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString();
+ // TODO: figure out why the following line was much slower
+ // rdb: new String(...) is slower because it instantiates a new Decoder,
+ // while Charset#decode uses a thread-local decoder cache
+ // return new String(value, offset, length, BytesUtils.UTF8);
+ }
+
+ @Override
+ public int length() {
+ return length;
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(value, offset, length);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return Arrays.copyOfRange(value, offset, offset + length);
+ }
+
+ @Override
+ public int hashCode() {
+ return Binary.hashCode(value, offset, length);
+ }
+
+ @Override
+ boolean equals(Binary other) {
+ return other.equals(value, offset, length);
+ }
+
+ @Override
+ boolean equals(byte[] other, int otherOffset, int otherLength) {
+ return Binary.equals(value, offset, length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public int compareTo(Binary other) {
+ return other.compareTo(value, offset, length);
+ }
+
+ @Override
+ int compareTo(byte[] other, int otherOffset, int otherLength) {
+ return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(value, offset, length);
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ out.write(value, offset, length);
+ }
+
+ }
+
+ private static class FromStringBinary extends ByteArrayBackedBinary {
+ public FromStringBinary(byte[] value) {
+ super(value);
+ }
+
+ @Override
+ public String toString() {
+ return "Binary{\"" + toStringUsingUTF8() + "\"}";
+ }
+ }
+
+ public static Binary fromByteArray(final byte[] value, final int offset, final int length) {
+ return new ByteArraySliceBackedBinary(value, offset, length);
+ }
+
+ private static class ByteArrayBackedBinary extends Binary {
+ private final byte[] value;
+
+ public ByteArrayBackedBinary(byte[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toStringUsingUTF8() {
+ return UTF8.decode(ByteBuffer.wrap(value)).toString();
+ }
+
+ @Override
+ public int length() {
+ return value.length;
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(value);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ return Binary.hashCode(value, 0, value.length);
+ }
+
+ @Override
+ boolean equals(Binary other) {
+ return other.equals(value, 0, value.length);
+ }
+
+ @Override
+ boolean equals(byte[] other, int otherOffset, int otherLength) {
+ return Binary.equals(value, 0, value.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public int compareTo(Binary other) {
+ return other.compareTo(value, 0, value.length);
+ }
+
+ @Override
+ int compareTo(byte[] other, int otherOffset, int otherLength) {
+ return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(value);
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ out.write(value);
+ }
+
+ }
+
+ public static Binary fromByteArray(final byte[] value) {
+ return new ByteArrayBackedBinary(value);
+ }
+
+ private static class ByteBufferBackedBinary extends Binary {
+ private transient ByteBuffer value;
+
+ public ByteBufferBackedBinary(ByteBuffer value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toStringUsingUTF8() {
+ return UTF8.decode(value).toString();
+ }
+
+ @Override
+ public int length() {
+ return value.remaining();
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ // TODO: should not have to materialize those bytes
+ out.write(getBytes());
+ }
+
+ @Override
+ public byte[] getBytes() {
+ byte[] bytes = new byte[value.remaining()];
+
+ value.mark();
+ value.get(bytes).reset();
+ return bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ if (value.hasArray()) {
+ return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining());
+ }
+ byte[] bytes = getBytes();
+ return Binary.hashCode(bytes, 0, bytes.length);
+ }
+
+ @Override
+ boolean equals(Binary other) {
+ if (value.hasArray()) {
+ return other.equals(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining());
+ }
+ byte[] bytes = getBytes();
+ return other.equals(bytes, 0, bytes.length);
+ }
+
+ @Override
+ boolean equals(byte[] other, int otherOffset, int otherLength) {
+ if (value.hasArray()) {
+ return Binary.equals(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+ }
+ byte[] bytes = getBytes();
+ return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public int compareTo(Binary other) {
+ if (value.hasArray()) {
+ return other.compareTo(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining());
+ }
+ byte[] bytes = getBytes();
+ return other.compareTo(bytes, 0, bytes.length);
+ }
+
+ @Override
+ int compareTo(byte[] other, int otherOffset, int otherLength) {
+ if (value.hasArray()) {
+ return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+ }
+ byte[] bytes = getBytes();
+ return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return value;
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ // TODO: should not have to materialize those bytes
+ out.write(getBytes());
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ byte[] bytes = getBytes();
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes, 0, length);
+ this.value = ByteBuffer.wrap(bytes);
+ }
+
+ private void readObjectNoData() throws ObjectStreamException {
+ this.value = ByteBuffer.wrap(new byte[0]);
+ }
+
+ }
+
+ public static Binary fromByteBuffer(final ByteBuffer value) {
+ return new ByteBufferBackedBinary(value);
+ }
+
+ public static Binary fromString(final String value) {
+ try {
+ return new FromStringBinary(value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new ParquetEncodingException("UTF-8 not supported.", e);
+ }
+ }
+
+ /**
+ * @see {@link Arrays#hashCode(byte[])}
+ * @param array
+ * @param offset
+ * @param length
+ * @return
+ */
+ private static final int hashCode(byte[] array, int offset, int length) {
+ int result = 1;
+ for (int i = offset; i < offset + length; i++) {
+ byte b = array[i];
+ result = 31 * result + b;
+ }
+ return result;
+ }
+
+ /**
+ * @see {@link Arrays#equals(byte[], byte[])}
+ * @param array1
+ * @param offset1
+ * @param length1
+ * @param array2
+ * @param offset2
+ * @param length2
+ * @return
+ */
+ private static final boolean equals(byte[] array1, int offset1, int length1, byte[] array2, int offset2, int length2) {
+ if (array1 == null && array2 == null) return true;
+ if (array1 == null || array2 == null) return false;
+ if (length1 != length2) return false;
+ if (array1 == array2 && offset1 == offset2) return true;
+ for (int i = 0; i < length1; i++) {
+ if (array1[i + offset1] != array2[i + offset2]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1,
+ byte[] array2, int offset2, int length2) {
+ if (array1 == null && array2 == null) return 0;
+ if (array1 == array2 && offset1 == offset2 && length1 == length2) return 0;
+ int min_length = (length1 < length2) ? length1 : length2;
+ for (int i = 0; i < min_length; i++) {
+ if (array1[i + offset1] < array2[i + offset2]) {
+ return 1;
+ }
+ if (array1[i + offset1] > array2[i + offset2]) {
+ return -1;
+ }
+ }
+ // check remainder
+ if (length1 == length2) { return 0; }
+ else if (length1 < length2) { return 1;}
+ else { return -1; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java
new file mode 100644
index 0000000..648e245
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+/**
+ * Represent a tree of converters
+ * that materializes tuples
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class Converter {
+
+ abstract public boolean isPrimitive();
+
+ public PrimitiveConverter asPrimitiveConverter() {
+ throw new ClassCastException("Expected instance of primitive converter but got \"" + getClass().getName() + "\"");
+ }
+
+ public GroupConverter asGroupConverter() {
+ throw new ClassCastException("Expected instance of group converter but got \"" + getClass().getName() + "\"");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java b/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java
new file mode 100644
index 0000000..823e388
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+
+/**
+ * converter for group nodes
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class GroupConverter extends Converter {
+
+ @Override
+ public boolean isPrimitive() {
+ return false;
+ }
+
+ @Override
+ public GroupConverter asGroupConverter() {
+ return this;
+ }
+
+ /**
+ * called at initialization based on schema
+ * must consistently return the same object
+ * @param fieldIndex index of the field in this group
+ * @return the corresponding converter
+ */
+ abstract public Converter getConverter(int fieldIndex);
+
+ /** runtime calls **/
+
+ /** called at the beginning of the group managed by this converter */
+ abstract public void start();
+
+ /**
+ * call at the end of the group
+ */
+ abstract public void end();
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java b/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java
new file mode 100644
index 0000000..763c6fd
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+import org.apache.parquet.column.Dictionary;
+
+/**
+ * converter for leaves of the schema
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class PrimitiveConverter extends Converter {
+
+ @Override
+ public boolean isPrimitive() {
+ return true;
+ }
+
+ @Override
+ public PrimitiveConverter asPrimitiveConverter() {
+ return this;
+ }
+
+ /**
+ * if it returns true we will attempt to use dictionary based conversion instead
+ * @return if dictionary is supported
+ */
+ public boolean hasDictionarySupport() {
+ return false;
+ }
+
+ /**
+ * Set the dictionary to use if the data was encoded using dictionary encoding
+ * and the converter hasDictionarySupport().
+ * @param dictionary the dictionary to use for conversion
+ */
+ public void setDictionary(Dictionary dictionary) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /** runtime calls **/
+
+ /**
+ * add a value based on the dictionary set with setDictionary()
+ * Will be used if the Converter has dictionary support and the data was encoded using a dictionary
+ * @param dictionaryId the id in the dictionary of the value to add
+ */
+ public void addValueFromDictionary(int dictionaryId) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value value to set
+ */
+ public void addBinary(Binary value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value value to set
+ */
+ public void addBoolean(boolean value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value value to set
+ */
+ public void addDouble(double value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value value to set
+ */
+ public void addFloat(float value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value value to set
+ */
+ public void addInt(int value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value value to set
+ */
+ public void addLong(long value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
new file mode 100644
index 0000000..953d87a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+
+/**
+ *
+ * Abstraction for writing records
+ * It decouples the striping algorithm from the actual record model
+ * example:
+ * <pre>
+ * startMessage()
+ * startField("A", 0)
+ * addValue(1)
+ * addValue(2)
+ * endField("A", 0)
+ * startField("B", 1)
+ * startGroup()
+ * startField("C", 0)
+ * addValue(3)
+ * endField("C", 0)
+ * endGroup()
+ * endField("B", 1)
+ * endMessage()
+ * </pre>
+ *
+ * would produce the following message:
+ * <pre>
+ * {
+ * A: [1, 2]
+ * B: {
+ * C: 3
+ * }
+ * }
+ * </pre>
+ * @author Julien Le Dem
+ *
+ */
+abstract public class RecordConsumer {
+
+ /**
+ * start a new record
+ */
+ abstract public void startMessage();
+
+ /**
+ * end of a record
+ */
+ abstract public void endMessage();
+
+ /**
+ * start of a field in a group or message
+ * if the field is repeated the field is started only once and all values added in between start and end
+ * @param field name of the field
+ * @param index of the field in the group or message
+ */
+ abstract public void startField(String field, int index);
+
+ /**
+ * end of a field in a group or message
+ * @param field name of the field
+ * @param index of the field in the group or message
+ */
+ abstract public void endField(String field, int index);
+
+ /**
+ * start of a group in a field
+ */
+ abstract public void startGroup();
+
+ /**
+ * end of a group in a field
+ */
+ abstract public void endGroup();
+
+ /**
+ * add an int value in the current field
+ * @param value
+ */
+ abstract public void addInteger(int value);
+
+ /**
+ * add a long value in the current field
+ * @param value
+ */
+ abstract public void addLong(long value);
+
+ /**
+ * add a boolean value in the current field
+ * @param value
+ */
+ abstract public void addBoolean(boolean value);
+
+ /**
+ * add a binary value in the current field
+ * @param value
+ */
+ abstract public void addBinary(Binary value);
+
+ /**
+ * add a float value in the current field
+ * @param value
+ */
+ abstract public void addFloat(float value);
+
+ /**
+ * add a double value in the current field
+ * @param value
+ */
+ abstract public void addDouble(double value);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
new file mode 100644
index 0000000..98e4d50
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+/**
+ * Top-level class which should be implemented in order to materialize objects from
+ * a stream of Parquet data.
+ *
+ * Each record will be wrapped by {@link GroupConverter#start()} and {@link GroupConverter#end()},
+ * between which the appropriate fields will be materialized.
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the materialized object class
+ */
+abstract public class RecordMaterializer<T> {
+
+ /**
+ * @return the result of the conversion
+ */
+ abstract public T getCurrentRecord();
+
+ /**
+ * Called if {@link #getCurrentRecord()} isn't going to be called.
+ */
+ public void skipCurrentRecord() { }
+
+ /**
+ * @return the root converter for this tree
+ */
+ abstract public GroupConverter getRootConverter();
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
new file mode 100644
index 0000000..0f68fc3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+
+import static org.apache.parquet.schema.OriginalType.*;
+
+/**
+ * Utility functions to convert from Java-like map and list types
+ * to equivalent Parquet types.
+ */
+public abstract class ConversionPatterns {
+ /**
+ * to preserve the difference between empty list and null when optional
+ *
+ * @param repetition
+ * @param alias name of the field
+ * @param originalType
+ * @param nested the nested repeated field
+ * @return a group type
+ */
+ private static GroupType listWrapper(Repetition repetition, String alias, OriginalType originalType, Type nested) {
+ if (!nested.isRepetition(Repetition.REPEATED)) {
+ throw new IllegalArgumentException("Nested type should be repeated: " + nested);
+ }
+ return new GroupType(repetition, alias, originalType, nested);
+ }
+
+ public static GroupType mapType(Repetition repetition, String alias, Type keyType, Type valueType) {
+ return mapType(repetition, alias, "map", keyType, valueType);
+ }
+
+ public static GroupType stringKeyMapType(Repetition repetition, String alias, String mapAlias, Type valueType) {
+ return mapType(repetition, alias, mapAlias, new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key", OriginalType.UTF8), valueType);
+ }
+
+ public static GroupType stringKeyMapType(Repetition repetition, String alias, Type valueType) {
+ return stringKeyMapType(repetition, alias, "map", valueType);
+ }
+
+ public static GroupType mapType(Repetition repetition, String alias, String mapAlias, Type keyType, Type valueType) {
+ //support projection only on key of a map
+ if (valueType == null) {
+ return listWrapper(
+ repetition,
+ alias,
+ MAP,
+ new GroupType(
+ Repetition.REPEATED,
+ mapAlias,
+ MAP_KEY_VALUE,
+ keyType)
+ );
+ } else {
+ if (!valueType.getName().equals("value")) {
+ throw new RuntimeException(valueType.getName() + " should be value");
+ }
+ return listWrapper(
+ repetition,
+ alias,
+ MAP,
+ new GroupType(
+ Repetition.REPEATED,
+ mapAlias,
+ MAP_KEY_VALUE,
+ keyType,
+ valueType)
+ );
+ }
+ }
+
+ /**
+ * @param repetition
+ * @param alias name of the field
+ * @param nestedType
+ * @return
+ */
+ public static GroupType listType(Repetition repetition, String alias, Type nestedType) {
+ return listWrapper(
+ repetition,
+ alias,
+ LIST,
+ nestedType
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java b/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java
new file mode 100644
index 0000000..446b956
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+public class DecimalMetadata {
+ private final int precision;
+ private final int scale;
+
+ public DecimalMetadata(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ DecimalMetadata that = (DecimalMetadata) o;
+
+ if (precision != that.precision) return false;
+ if (scale != that.scale) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = precision;
+ result = 31 * result + scale;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
new file mode 100644
index 0000000..027fbc0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import static java.util.Arrays.asList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.io.InvalidRecordException;
+
+/**
+ * Represents a group type: a list of fields
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class GroupType extends Type {
+
+ private final List<Type> fields;
+ private final Map<String, Integer> indexByName;
+
+ /**
+ * @param repetition OPTIONAL, REPEATED, REQUIRED
+ * @param name the name of the field
+ * @param fields the contained fields
+ */
+ public GroupType(Repetition repetition, String name, List<Type> fields) {
+ this(repetition, name, null, fields, null);
+ }
+
+ /**
+ * @param repetition OPTIONAL, REPEATED, REQUIRED
+ * @param name the name of the field
+ * @param fields the contained fields
+ */
+ public GroupType(Repetition repetition, String name, Type... fields) {
+ this(repetition, name, Arrays.asList(fields));
+ }
+
+ /**
+ * @param repetition OPTIONAL, REPEATED, REQUIRED
+ * @param name the name of the field
+ * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
+ * @param fields the contained fields
+ */
+ @Deprecated
+ public GroupType(Repetition repetition, String name, OriginalType originalType, Type... fields) {
+ this(repetition, name, originalType, Arrays.asList(fields));
+ }
+
+ /**
+ * @param repetition OPTIONAL, REPEATED, REQUIRED
+ * @param name the name of the field
+ * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
+ * @param fields the contained fields
+ */
+ @Deprecated
+ public GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields) {
+ this(repetition, name, originalType, fields, null);
+ }
+
+ /**
+ * @param repetition OPTIONAL, REPEATED, REQUIRED
+ * @param name the name of the field
+ * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
+ * @param fields the contained fields
+ * @param id the id of the field
+ */
+ GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields, ID id) {
+ super(name, repetition, originalType, id);
+ this.fields = fields;
+ this.indexByName = new HashMap<String, Integer>();
+ for (int i = 0; i < fields.size(); i++) {
+ indexByName.put(fields.get(i).getName(), i);
+ }
+ }
+
+ /**
+ * @param id the field id
+ * @return a new GroupType with the same fields and a new id
+ */
+ @Override
+ public GroupType withId(int id) {
+ return new GroupType(getRepetition(), getName(), getOriginalType(), fields, new ID(id));
+ }
+
+ /**
+ * @param newFields
+ * @return a group with the same attributes and new fields.
+ */
+ public GroupType withNewFields(List<Type> newFields) {
+ return new GroupType(getRepetition(), getName(), getOriginalType(), newFields, getId());
+ }
+
+ /**
+ * @param newFields
+ * @return a group with the same attributes and new fields.
+ */
+ public GroupType withNewFields(Type... newFields) {
+ return withNewFields(asList(newFields));
+ }
+
+ /**
+ * returns the name of the corresponding field
+ * @param index the index of the desired field in this type
+ * @return the name of the field at this index
+ */
+ public String getFieldName(int index) {
+ return fields.get(index).getName();
+ }
+
+ /**
+ * @param name the requested name
+ * @return whether this type contains a field with that name
+ */
+ public boolean containsField(String name) {
+ return indexByName.containsKey(name);
+ }
+
+ /**
+ *
+ * @param name
+ * @return the index of the field with that name
+ */
+ public int getFieldIndex(String name) {
+ if (!indexByName.containsKey(name)) {
+ throw new InvalidRecordException(name + " not found in " + this);
+ }
+ return indexByName.get(name);
+ }
+
+ /**
+ * @return the fields contained in this type
+ */
+ public List<Type> getFields() {
+ return fields;
+ }
+
+ /**
+ * @return the number of fields in this type
+ */
+ public int getFieldCount() {
+ return fields.size();
+ }
+
+ /**
+ * @return false
+ */
+ @Override
+ public boolean isPrimitive() {
+ return false;
+ }
+
+ /**
+ * @param fieldName
+ * @return the type of this field by name
+ */
+ public Type getType(String fieldName) {
+ return getType(getFieldIndex(fieldName));
+ }
+
+ /**
+ * @param index
+ * @return the type of this field by index
+ */
+ public Type getType(int index) {
+ return fields.get(index);
+ }
+
+ /**
+ * appends a display string for of the members of this group to sb
+ * @param sb where to append
+ * @param indent the indentation level
+ */
+ void membersDisplayString(StringBuilder sb, String indent) {
+ for (Type field : fields) {
+ field.writeToStringBuilder(sb, indent);
+ if (field.isPrimitive()) {
+ sb.append(";");
+ }
+ sb.append("\n");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void writeToStringBuilder(StringBuilder sb, String indent) {
+ sb.append(indent)
+ .append(getRepetition().name().toLowerCase())
+ .append(" group ")
+ .append(getName())
+ .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+ .append(getId() == null ? "" : " = " + getId())
+ .append(" {\n");
+ membersDisplayString(sb, indent + " ");
+ sb.append(indent)
+ .append("}");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void accept(TypeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ @Override @Deprecated
+ protected int typeHashCode() {
+ return hashCode();
+ }
+
+ @Override @Deprecated
+ protected boolean typeEquals(Type other) {
+ return equals(other);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode() {
+ return super.hashCode() * 31 + getFields().hashCode();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected boolean equals(Type otherType) {
+ return
+ !otherType.isPrimitive()
+ && super.equals(otherType)
+ && getFields().equals(otherType.asGroupType().getFields());
+ }
+
+ @Override
+ protected int getMaxRepetitionLevel(String[] path, int depth) {
+ int myVal = isRepetition(Repetition.REPEATED) ? 1 : 0;
+ if (depth == path.length) {
+ return myVal;
+ }
+ return myVal + getType(path[depth]).getMaxRepetitionLevel(path, depth + 1);
+ }
+
+ @Override
+ protected int getMaxDefinitionLevel(String[] path, int depth) {
+ int myVal = !isRepetition(Repetition.REQUIRED) ? 1 : 0;
+ if (depth == path.length) {
+ return myVal;
+ }
+ return myVal + getType(path[depth]).getMaxDefinitionLevel(path, depth + 1);
+ }
+
+ @Override
+ protected Type getType(String[] path, int depth) {
+ if (depth == path.length) {
+ return this;
+ }
+ return getType(path[depth]).getType(path, depth + 1);
+ }
+
+ @Override
+ protected boolean containsPath(String[] path, int depth) {
+ if (depth == path.length) {
+ return false;
+ }
+ return containsField(path[depth]) && getType(path[depth]).containsPath(path, depth + 1);
+ }
+
+ @Override
+ protected List<String[]> getPaths(int depth) {
+ List<String[]> result = new ArrayList<String[]>();
+ for (Type field : fields) {
+ List<String[]> paths = field.getPaths(depth + 1);
+ for (String[] path : paths) {
+ path[depth] = field.getName();
+ result.add(path);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void checkContains(Type subType) {
+ super.checkContains(subType);
+ checkGroupContains(subType);
+ }
+
+ void checkGroupContains(Type subType) {
+ if (subType.isPrimitive()) {
+ throw new InvalidRecordException(subType + " found: expected " + this);
+ }
+ List<Type> fields = subType.asGroupType().getFields();
+ for (Type otherType : fields) {
+ Type thisType = this.getType(otherType.getName());
+ thisType.checkContains(otherType);
+ }
+ }
+
+ @Override
+ <T> T convert(List<GroupType> path, TypeConverter<T> converter) {
+ List<GroupType> childrenPath = new ArrayList<GroupType>(path);
+ childrenPath.add(this);
+ final List<T> children = convertChildren(childrenPath, converter);
+ return converter.convertGroupType(path, this, children);
+ }
+
+ protected <T> List<T> convertChildren(List<GroupType> path, TypeConverter<T> converter) {
+ List<T> children = new ArrayList<T>(fields.size());
+ for (Type field : fields) {
+ children.add(field.convert(path, converter));
+ }
+ return children;
+ }
+
+ @Override
+ protected Type union(Type toMerge) {
+ return union(toMerge, true);
+ }
+
+ @Override
+ protected Type union(Type toMerge, boolean strict) {
+ if (toMerge.isPrimitive()) {
+ throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
+ }
+ return new GroupType(toMerge.getRepetition(), getName(), mergeFields(toMerge.asGroupType()));
+ }
+
+ /**
+ * produces the list of fields resulting from merging toMerge into the fields of this
+ * @param toMerge the group containing the fields to merge
+ * @return the merged list
+ */
+ List<Type> mergeFields(GroupType toMerge) {
+ return mergeFields(toMerge, true);
+ }
+
+ /**
+ * produces the list of fields resulting from merging toMerge into the fields of this
+ * @param toMerge the group containing the fields to merge
+ * @param strict should schema primitive types match
+ * @return the merged list
+ */
+ List<Type> mergeFields(GroupType toMerge, boolean strict) {
+ List<Type> newFields = new ArrayList<Type>();
+ // merge existing fields
+ for (Type type : this.getFields()) {
+ Type merged;
+ if (toMerge.containsField(type.getName())) {
+ Type fieldToMerge = toMerge.getType(type.getName());
+ if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
+ throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
+ }
+ merged = type.union(fieldToMerge, strict);
+ } else {
+ merged = type;
+ }
+ newFields.add(merged);
+ }
+ // add new fields
+ for (Type type : toMerge.getFields()) {
+ if (!this.containsField(type.getName())) {
+ newFields.add(type);
+ }
+ }
+ return newFields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java b/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java
new file mode 100644
index 0000000..cde5e75
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when we are trying to read together files with incompatible schemas.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class IncompatibleSchemaModificationException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public IncompatibleSchemaModificationException() {
+ super();
+ }
+
+ public IncompatibleSchemaModificationException(String message,
+ Throwable cause) {
+ super(message, cause);
+ }
+
+ public IncompatibleSchemaModificationException(String message) {
+ super(message);
+ }
+
+ public IncompatibleSchemaModificationException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
new file mode 100644
index 0000000..1e26ed2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.InvalidRecordException;
+
+/**
+ * The root of a schema
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class MessageType extends GroupType {
+
+ /**
+ *
+ * @param name the name of the type
+ * @param fields the fields contained by this message
+ */
+ public MessageType(String name, Type... fields) {
+ super(Repetition.REPEATED, name, fields);
+ }
+
+ /**
+ *
+ * @param name the name of the type
+ * @param fields the fields contained by this message
+ */
+ public MessageType(String name, List<Type> fields) {
+ super(Repetition.REPEATED, name, fields);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void accept(TypeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void writeToStringBuilder(StringBuilder sb, String indent) {
+ sb.append("message ")
+ .append(getName())
+ .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+ .append(" {\n");
+ membersDisplayString(sb, " ");
+ sb.append("}\n");
+ }
+
+ /**
+ * @return the max repetition level that might be needed to encode the
+ * type at 'path'.
+ */
+ public int getMaxRepetitionLevel(String ... path) {
+ return getMaxRepetitionLevel(path, 0) - 1;
+ }
+
+ /**
+ * @return the max repetition level that might be needed to encode the
+ * type at 'path'.
+ */
+ public int getMaxDefinitionLevel(String ... path) {
+ return getMaxDefinitionLevel(path, 0) - 1;
+ }
+
+ public Type getType(String ... path) {
+ return getType(path, 0);
+ }
+
+ public ColumnDescriptor getColumnDescription(String[] path) {
+ int maxRep = getMaxRepetitionLevel(path);
+ int maxDef = getMaxDefinitionLevel(path);
+ PrimitiveType type = getType(path).asPrimitiveType();
+ return new ColumnDescriptor(path, type.getPrimitiveTypeName(),
+ type.getTypeLength(), maxRep, maxDef);
+ }
+
+ public List<String[]> getPaths() {
+ return this.getPaths(0);
+ }
+
+ public List<ColumnDescriptor> getColumns() {
+ List<String[]> paths = this.getPaths(0);
+ List<ColumnDescriptor> columns = new ArrayList<ColumnDescriptor>(paths.size());
+ for (String[] path : paths) {
+ // TODO: optimize this
+ PrimitiveType primitiveType = getType(path).asPrimitiveType();
+ columns.add(new ColumnDescriptor(
+ path,
+ primitiveType.getPrimitiveTypeName(),
+ primitiveType.getTypeLength(),
+ getMaxRepetitionLevel(path),
+ getMaxDefinitionLevel(path)));
+ }
+ return columns;
+ }
+
+ @Override
+ public void checkContains(Type subType) {
+ if (!(subType instanceof MessageType)) {
+ throw new InvalidRecordException(subType + " found: expected " + this);
+ }
+ checkGroupContains(subType);
+ }
+
+ public <T> T convertWith(TypeConverter<T> converter) {
+ final ArrayList<GroupType> path = new ArrayList<GroupType>();
+ path.add(this);
+ return converter.convertMessageType(this, convertChildren(path, converter));
+ }
+
+ public boolean containsPath(String[] path) {
+ return containsPath(path, 0);
+ }
+
+ public MessageType union(MessageType toMerge) {
+ return union(toMerge, true);
+ }
+
+ public MessageType union(MessageType toMerge, boolean strict) {
+ return new MessageType(this.getName(), mergeFields(toMerge, strict));
+ }
+
+}