You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2015/02/05 20:37:20 UTC

incubator-parquet-mr git commit: PARQUET-181: Scrooge Write Support (take two)

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 80417356f -> 668d031d7


PARQUET-181: Scrooge Write Support (take two)

This is similar to https://github.com/apache/incubator-parquet-mr/pull/43, but instead of making `ThriftWriteSupport` abstract, it keeps it around (but deprecated) and adds `AbstractThriftWriteSupport`. This is a little less elegant, but it seems to appease the semver overlords.

Author: Colin Marc <co...@gmail.com>

Closes #58 from colinmarc/scrooge-write-support-2 and squashes the following commits:

e2a0abd [Colin Marc] add write support to ParquetScroogeScheme
19cf1a8 [Colin Marc] Add ScroogeWriteSupport and ParquetScroogeOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/668d031d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/668d031d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/668d031d

Branch: refs/heads/master
Commit: 668d031d7213d5e76cf39770ffce7f030c9bf056
Parents: 8041735
Author: Colin Marc <co...@gmail.com>
Authored: Thu Feb 5 11:37:06 2015 -0800
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Thu Feb 5 11:37:06 2015 -0800

----------------------------------------------------------------------
 .../parquet/cascading/ParquetTBaseScheme.java   |   6 +-
 .../scrooge/ParquetScroogeOutputFormat.java     |  39 ++++++
 .../parquet/scrooge/ParquetScroogeScheme.java   |  22 ++--
 .../parquet/scrooge/ScroogeWriteSupport.java    |  65 ++++++++++
 .../scrooge/ParquetScroogeSchemeTest.java       |  78 ++++++++++++
 parquet-scrooge/src/test/resources/names.txt    |   3 +
 .../thrift/AbstractThriftWriteSupport.java      | 126 +++++++++++++++++++
 .../thrift/ParquetThriftBytesOutputFormat.java  |   4 +-
 .../hadoop/thrift/TBaseWriteSupport.java        |  63 ++++++++++
 .../hadoop/thrift/ThriftBytesWriteSupport.java  |  10 +-
 .../hadoop/thrift/ThriftWriteSupport.java       |  82 +++---------
 .../java/parquet/thrift/ThriftMetaData.java     |   3 -
 .../parquet/thrift/ThriftParquetWriter.java     |   8 +-
 13 files changed, 409 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
index 127a7dc..41f8c4f 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
@@ -30,7 +30,7 @@ import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
 import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
 import parquet.hadoop.thrift.ThriftReadSupport;
-import parquet.hadoop.thrift.ThriftWriteSupport;
+import parquet.hadoop.thrift.TBaseWriteSupport;
 import parquet.thrift.TBaseRecordConverter;
 
 public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
@@ -74,7 +74,7 @@ public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme
     }
 
     jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
-    DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, ThriftWriteSupport.class);
-    ThriftWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
+    DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
+    TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeOutputFormat.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeOutputFormat.java
new file mode 100644
index 0000000..98dc2b3
--- /dev/null
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeOutputFormat.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.scrooge;
+
+import com.twitter.scrooge.ThriftStruct;
+import org.apache.hadoop.conf.Configuration;
+import parquet.hadoop.ParquetOutputFormat;
+
+/**
+ * Use this class to write Scrooge records to parquet
+ * @param <T>  Type of Scrooge records to write
+ */
+public class ParquetScroogeOutputFormat<T extends ThriftStruct> extends ParquetOutputFormat<T> {
+
+  public static void setScroogeClass(Configuration configuration, Class<? extends ThriftStruct> thriftClass) {
+    ScroogeWriteSupport.setScroogeClass(configuration, thriftClass);
+  }
+
+  public static Class<? extends ThriftStruct> getScroogeClass(Configuration configuration) {
+    return ScroogeWriteSupport.getScroogeClass(configuration);
+  }
+
+  public ParquetScroogeOutputFormat() {
+    super(new ScroogeWriteSupport<T>());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
index 7d4ff05..3abe957 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
@@ -32,7 +32,9 @@ import cascading.tap.Tap;
 import parquet.cascading.ParquetValueScheme;
 import parquet.filter2.predicate.FilterPredicate;
 import parquet.hadoop.ParquetInputFormat;
+import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
 import parquet.hadoop.thrift.ThriftReadSupport;
 
 public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueScheme<T> {
@@ -52,17 +54,13 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueSc
   }
 
   @Override
-  public void sinkConfInit(FlowProcess<JobConf> arg0,
-      Tap<JobConf, RecordReader, OutputCollector> arg1, JobConf arg2) {
-    throw new UnsupportedOperationException("ParquetScroogeScheme does not support Sinks");
+  public void sinkConfInit(FlowProcess<JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    ParquetOutputFormat.setWriteSupportClass(jobConf, ScroogeWriteSupport.class);
+    ScroogeWriteSupport.setScroogeClass(jobConf, this.config.getKlass());
   }
 
-  /**
-   * TODO: currently we cannot write Parquet files from Scrooge objects.
-   */
-  @Override
-  public boolean isSink() { return false; }
-
   @Override
   public void sourceConfInit(FlowProcess<JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
@@ -71,10 +69,4 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueSc
     ParquetInputFormat.setReadSupportClass(jobConf, ScroogeReadSupport.class);
     ThriftReadSupport.setRecordConverterClass(jobConf, ScroogeRecordConverter.class);
   }
-
-  @Override
-  public void sink(FlowProcess<JobConf> arg0, SinkCall<Object[], OutputCollector> arg1)
-      throws IOException {
-    throw new UnsupportedOperationException("ParquetScroogeScheme does not support Sinks");
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeWriteSupport.java b/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeWriteSupport.java
new file mode 100644
index 0000000..6e1300e
--- /dev/null
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeWriteSupport.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.scrooge;
+
+import com.twitter.scrooge.ThriftStruct;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TException;
+
+import parquet.hadoop.thrift.AbstractThriftWriteSupport;
+import parquet.io.ParquetEncodingException;
+import parquet.thrift.struct.ThriftType.StructType;
+
+/**
+ * Write support for Scrooge
+ */
+public class ScroogeWriteSupport<T extends ThriftStruct> extends AbstractThriftWriteSupport<T> {
+  public static void setScroogeClass(Configuration configuration, Class<? extends ThriftStruct> thriftClass) {
+    AbstractThriftWriteSupport.setGenericThriftClass(configuration, thriftClass);
+  }
+
+  public static Class<? extends ThriftStruct> getScroogeClass(Configuration configuration) {
+    return (Class<? extends ThriftStruct>)AbstractThriftWriteSupport.getGenericThriftClass(configuration);
+  }
+
+  /**
+   * used from hadoop
+   * the configuration must contain a "parquet.thrift.write.class" setting
+   * (see ScroogeWriteSupport#setScroogeClass)
+   */
+  public ScroogeWriteSupport() {
+  }
+
+  public ScroogeWriteSupport(Class<T> thriftClass) {
+    super(thriftClass);
+  }
+
+  @Override
+  protected StructType getThriftStruct() {
+    ScroogeStructConverter schemaConverter = new ScroogeStructConverter();
+    return schemaConverter.convert(thriftClass);
+  }
+
+  @Override
+  public void write(T record) {
+    try {
+      record.write(parquetWriteProtocol);
+    } catch (TException e) {
+      throw new ParquetEncodingException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java b/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
index 98c8d73..bbc3fe4 100644
--- a/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
+++ b/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
@@ -32,6 +32,7 @@ import cascading.tap.Tap;
 import cascading.tap.hadoop.Hfs;
 import cascading.tuple.Fields;
 import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +52,8 @@ import parquet.scrooge.test.TestPersonWithAllInformation;
 import parquet.thrift.test.Address;
 import parquet.thrift.test.Phone;
 import parquet.thrift.test.RequiredPrimitiveFixture;
+import parquet.scrooge.test.Name;
+import parquet.scrooge.test.Name$;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -59,6 +62,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import scala.Option;
 
 import static org.junit.Assert.assertEquals;
 
@@ -156,4 +160,78 @@ public class ParquetScroogeSchemeTest {
       fs.delete(p, true);
     }
   }
+
+  final String txtInputPath = "src/test/resources/names.txt";
+  final String parquetOutputPath = "target/test/ParquetScroogeScheme/names-parquet-out";
+  final String txtOutputPath = "target/test/ParquetScroogeScheme/names-txt-out";
+
+  @Test
+  public void testWriteThenRead() throws Exception {
+    doWrite();
+    doRead();
+  }
+
+  private void doWrite() throws Exception {
+    Path path = new Path(parquetOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
+    Tap source = new Hfs(sourceScheme, txtInputPath);
+
+    Scheme sinkScheme = new ParquetScroogeScheme<Name>(Name.class);
+    Tap sink = new Hfs(sinkScheme, parquetOutputPath);
+
+    Pipe assembly = new Pipe( "namecp" );
+    assembly = new Each(assembly, new PackThriftFunction());
+    Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+  }
+
+  private void doRead() throws Exception {
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new ParquetScroogeScheme<Name>(Name.class);
+    Tap source = new Hfs(sourceScheme, parquetOutputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe( "namecp" );
+    assembly = new Each(assembly, new UnpackThriftFunction());
+    Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
+    assertEquals("0\tAlice\tPractice\n15\tBob\tHope\n24\tCharlie\tHorse\n", result);
+  }
+
+  private static class PackThriftFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Name name = Name$.MODULE$.apply(arguments.getString(0), Option.apply(arguments.getString(1)));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+
+  private static class UnpackThriftFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Name name = (Name) arguments.getObject(0);
+      result.add(name.firstName());
+      result.add(name.lastName().get());
+      functionCall.getOutputCollector().add(result);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-scrooge/src/test/resources/names.txt
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/resources/names.txt b/parquet-scrooge/src/test/resources/names.txt
new file mode 100644
index 0000000..cf0d55e
--- /dev/null
+++ b/parquet-scrooge/src/test/resources/names.txt
@@ -0,0 +1,3 @@
+Alice	Practice
+Bob	Hope
+Charlie	Horse

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/AbstractThriftWriteSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
new file mode 100644
index 0000000..5fd32e8
--- /dev/null
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
@@ -0,0 +1,126 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop.thrift;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TBase;
+
+import com.twitter.elephantbird.pig.util.ThriftToPig;
+
+import parquet.Log;
+import parquet.hadoop.BadConfigurationException;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.RecordConsumer;
+import parquet.pig.PigMetaData;
+import parquet.schema.MessageType;
+import parquet.thrift.ParquetWriteProtocol;
+import parquet.thrift.ThriftMetaData;
+import parquet.thrift.ThriftSchemaConverter;
+import parquet.thrift.struct.ThriftType.StructType;
+
+
+public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
+  public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
+  private static final Log LOG = Log.getLog(AbstractThriftWriteSupport.class);
+
+  public static void setGenericThriftClass(Configuration configuration, Class<?> thriftClass) {
+    configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
+  }
+
+  public static Class getGenericThriftClass(Configuration configuration) {
+    final String thriftClassName = configuration.get(PARQUET_THRIFT_CLASS);
+    if (thriftClassName == null) {
+      throw new BadConfigurationException("the thrift class conf is missing in job conf at " + PARQUET_THRIFT_CLASS);
+    }
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class thriftClass = Class.forName(thriftClassName);
+      return thriftClass;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("the class "+thriftClassName+" in job conf at " + PARQUET_THRIFT_CLASS + " could not be found", e);
+    }
+  }
+
+  protected Class<T> thriftClass;
+  protected MessageType schema;
+  protected StructType thriftStruct;
+  protected ParquetWriteProtocol parquetWriteProtocol;
+  protected WriteContext writeContext;
+
+  /**
+   * used from hadoop
+   * the configuration must contain a thriftClass setting
+   * @see AbstractThriftWriteSupport#setThriftClass(Configuration, Class)
+   */
+  public AbstractThriftWriteSupport() {
+  }
+
+  /**
+   * @param thriftClass the thrift class used for writing values
+   */
+  public AbstractThriftWriteSupport(Class<T> thriftClass) {
+    init(thriftClass);
+  }
+
+  protected void init(Class<T> thriftClass) {
+    this.thriftClass = thriftClass;
+    this.thriftStruct = getThriftStruct();
+
+    ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
+    this.schema = thriftSchemaConverter.convert(thriftStruct);
+
+    final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData();
+    // adding the Pig schema as it would have been mapped from thrift
+    // TODO: make this work for non-tbase types
+    if (isPigLoaded() && TBase.class.isAssignableFrom(thriftClass)) {
+      new PigMetaData(new ThriftToPig((Class<? extends TBase<?,?>>)thriftClass).toSchema()).addToMetaData(extraMetaData);
+    }
+
+    this.writeContext = new WriteContext(schema, extraMetaData);
+  }
+
+  protected boolean isPigLoaded() {
+    try {
+      Class.forName("org.apache.pig.impl.logicalLayer.schema.Schema");
+      return true;
+    } catch (ClassNotFoundException e) {
+      LOG.info("Pig is not loaded, pig metadata will not be written");
+      return false;
+    }
+  }
+
+  @Override
+  public WriteContext init(Configuration configuration) {
+    if (writeContext == null) {
+      init(getGenericThriftClass(configuration));
+    }
+    return writeContext;
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+    this.parquetWriteProtocol = new ParquetWriteProtocol(recordConsumer, columnIO, thriftStruct);
+  }
+
+  protected abstract StructType getThriftStruct();
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
index 5a5dfbb..7d6bfb5 100644
--- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
@@ -38,11 +38,11 @@ import parquet.thrift.FieldIgnoredHandler;
 public class ParquetThriftBytesOutputFormat extends ParquetOutputFormat<BytesWritable> {
 
   public static void setThriftClass(Job job, Class<? extends TBase<?, ?>> thriftClass) {
-    ThriftWriteSupport.setThriftClass(ContextUtil.getConfiguration(job), thriftClass);
+    TBaseWriteSupport.setThriftClass(ContextUtil.getConfiguration(job), thriftClass);
   }
 
   public static Class<? extends TBase<?,?>> getThriftClass(Job job) {
-    return ThriftWriteSupport.getThriftClass(ContextUtil.getConfiguration(job));
+    return TBaseWriteSupport.getThriftClass(ContextUtil.getConfiguration(job));
   }
 
   public static <U extends TProtocol> void setTProtocolClass(Job job, Class<U> tProtocolClass) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/hadoop/thrift/TBaseWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/TBaseWriteSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/TBaseWriteSupport.java
new file mode 100644
index 0000000..bbc0293
--- /dev/null
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/TBaseWriteSupport.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+
+import parquet.hadoop.BadConfigurationException;
+import parquet.io.ParquetEncodingException;
+import parquet.thrift.ThriftSchemaConverter;
+import parquet.thrift.struct.ThriftType.StructType;
+
+public class TBaseWriteSupport<T extends TBase<?, ?>> extends AbstractThriftWriteSupport<T> {
+
+  public static <U extends TBase<?,?>> void setThriftClass(Configuration configuration, Class<U> thriftClass) {
+    AbstractThriftWriteSupport.setGenericThriftClass(configuration, thriftClass);
+  }
+
+  public static Class<? extends TBase<?,?>> getThriftClass(Configuration configuration) {
+    return (Class<? extends TBase<?,?>>)AbstractThriftWriteSupport.getGenericThriftClass(configuration);
+  }
+
+  /**
+   * used from hadoop
+   * the configuration must contain a thriftClass setting
+   * @see TBaseWriteSupport#setThriftClass(Configuration, Class)
+   */
+  public TBaseWriteSupport() {
+  }
+
+  public TBaseWriteSupport(Class<T> thriftClass) {
+    super(thriftClass);
+  }
+
+  @Override
+  protected StructType getThriftStruct() {
+    ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
+    return thriftSchemaConverter.toStructType((Class<TBase<?, ?>>)thriftClass);
+  }
+
+  @Override
+  public void write(T record) {
+    try {
+      record.write(parquetWriteProtocol);
+    } catch (TException e) {
+      throw new ParquetEncodingException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftBytesWriteSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
index d4175d7..d585e71 100644
--- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
@@ -67,10 +67,10 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
 
   private final boolean buffered;
   @SuppressWarnings("rawtypes") // TODO: fix type
-  private final ThriftWriteSupport<?> thriftWriteSupport = new ThriftWriteSupport();
+  private final TBaseWriteSupport<?> thriftWriteSupport = new TBaseWriteSupport();
   private ProtocolPipe readToWrite;
   private TProtocolFactory protocolFactory;
-  private Class<? extends TBase<?, ?>> thriftClass;
+  private Class<? extends TBase<?,?>> thriftClass;
   private MessageType schema;
   private StructType thriftStruct;
   private ParquetWriteProtocol parquetWriteProtocol;
@@ -104,13 +104,13 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
       }
     }
     if (thriftClass != null) {
-      ThriftWriteSupport.setThriftClass(configuration, thriftClass);
+      TBaseWriteSupport.setThriftClass(configuration, thriftClass);
     } else {
-      thriftClass = ThriftWriteSupport.getThriftClass(configuration);
+      thriftClass = TBaseWriteSupport.getThriftClass(configuration);
     }
     ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
     this.thriftStruct = thriftSchemaConverter.toStructType(thriftClass);
-    this.schema = thriftSchemaConverter.convert(thriftClass);
+    this.schema = thriftSchemaConverter.convert(thriftStruct);
     if (buffered) {
       readToWrite = new BufferedProtocolReadToWrite(thriftStruct, errorHandler);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java
index 4323493..982dcce 100644
--- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java
@@ -24,49 +24,26 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 
-import com.twitter.elephantbird.pig.util.ThriftToPig;
-
-import parquet.Log;
-import parquet.hadoop.BadConfigurationException;
 import parquet.hadoop.api.WriteSupport;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetEncodingException;
 import parquet.io.api.RecordConsumer;
-import parquet.pig.PigMetaData;
-import parquet.schema.MessageType;
-import parquet.thrift.ParquetWriteProtocol;
-import parquet.thrift.ThriftMetaData;
-import parquet.thrift.ThriftSchemaConverter;
-import parquet.thrift.struct.ThriftType.StructType;
-
 
+/**
+ * @deprecated
+ * This class is replaced by TBaseWriteSupport.
+ */
+@Deprecated
 public class ThriftWriteSupport<T extends TBase<?,?>> extends WriteSupport<T> {
-  public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
-  private static final Log LOG = Log.getLog(ThriftWriteSupport.class);
+  public static final String PARQUET_THRIFT_CLASS = AbstractThriftWriteSupport.PARQUET_THRIFT_CLASS;
 
   public static <U extends TBase<?,?>> void setThriftClass(Configuration configuration, Class<U> thriftClass) {
-    configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
+    TBaseWriteSupport.setThriftClass(configuration, thriftClass);
   }
 
   public static Class<? extends TBase<?,?>> getThriftClass(Configuration configuration) {
-    final String thriftClassName = configuration.get(PARQUET_THRIFT_CLASS);
-    if (thriftClassName == null) {
-      throw new BadConfigurationException("the thrift class conf is missing in job conf at " + PARQUET_THRIFT_CLASS);
-    }
-    try {
-      @SuppressWarnings("unchecked")
-      Class<? extends TBase<?,?>> thriftClass = (Class<? extends TBase<?,?>>)Class.forName(thriftClassName);
-      return thriftClass;
-    } catch (ClassNotFoundException e) {
-      throw new BadConfigurationException("the class "+thriftClassName+" in job conf at " + PARQUET_THRIFT_CLASS + " could not be found", e);
-    }
+    return TBaseWriteSupport.getThriftClass(configuration);
   }
 
-  private MessageType schema;
-  private StructType thriftStruct;
-  private ParquetWriteProtocol parquetWriteProtocol;
-  private WriteContext writeContext;
+  private TBaseWriteSupport writeSupport;
 
   /**
    * used from hadoop
@@ -74,59 +51,28 @@ public class ThriftWriteSupport<T extends TBase<?,?>> extends WriteSupport<T> {
    * @see ThriftWriteSupport#setThriftClass(Configuration, Class)
    */
   public ThriftWriteSupport() {
+    this.writeSupport = new TBaseWriteSupport();
   }
 
   /**
    * @param thriftClass the thrift class used for writing values
    */
   public ThriftWriteSupport(Class<T> thriftClass) {
-    init(thriftClass);
-  }
-
-  private <S extends TBase<?, ?>> void init(Class<S> thriftClass) {
-    ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
-    this.thriftStruct = thriftSchemaConverter.toStructType(thriftClass);
-    this.schema = thriftSchemaConverter.convert(thriftClass);
-    final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData();
-    // adding the Pig schema as it would have been mapped from thrift
-    if (isPigLoaded()) {
-      new PigMetaData(new ThriftToPig<S>(thriftClass).toSchema()).addToMetaData(extraMetaData);
-    }
-    writeContext = new WriteContext(schema, extraMetaData);
-  }
-
-  private boolean isPigLoaded() {
-    try {
-      Class.forName("org.apache.pig.impl.logicalLayer.schema.Schema");
-      return true;
-    } catch (ClassNotFoundException e) {
-      LOG.info("Pig is not loaded, pig metadata will not be written");
-      return false;
-    }
+    this.writeSupport = new TBaseWriteSupport(thriftClass);
   }
 
   @Override
   public WriteContext init(Configuration configuration) {
-    if (writeContext == null) {
-      init(getThriftClass(configuration));
-    }
-    return writeContext;
+    return this.writeSupport.init(configuration);
   }
 
   @Override
   public void prepareForWrite(RecordConsumer recordConsumer) {
-    final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    this.parquetWriteProtocol = new ParquetWriteProtocol(recordConsumer, columnIO, thriftStruct);
+    this.writeSupport.prepareForWrite(recordConsumer);
   }
 
   @Override
   public void write(T record) {
-    try {
-      record.write(parquetWriteProtocol);
-    } catch (TException e) {
-      throw new ParquetEncodingException(e);
-    }
+    this.writeSupport.write(record);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/thrift/ThriftMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftMetaData.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftMetaData.java
index 1da52b4..b35b30e 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftMetaData.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftMetaData.java
@@ -71,9 +71,6 @@ public class ThriftMetaData {
   public static Class<?> getThriftClass(String thriftClassName) {
     try {
       Class<?> thriftClass = Class.forName(thriftClassName);
-      if (!TBase.class.isAssignableFrom(thriftClass)) {
-        throw new BadConfigurationException("Provided class " + thriftClassName + " does not extend TBase");
-      }
       return thriftClass;
     } catch (ClassNotFoundException e) {
       throw new BadConfigurationException("Could not instantiate thrift class " + thriftClassName, e);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/668d031d/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetWriter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetWriter.java
index 4446b10..1ff6ce0 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetWriter.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetWriter.java
@@ -26,7 +26,7 @@ import org.apache.thrift.TBase;
 
 import parquet.hadoop.ParquetWriter;
 import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.thrift.ThriftWriteSupport;
+import parquet.hadoop.thrift.TBaseWriteSupport;
 
 /**
  * To generate Parquet files using thrift
@@ -38,15 +38,15 @@ import parquet.hadoop.thrift.ThriftWriteSupport;
 public class ThriftParquetWriter<T extends TBase<?,?>> extends ParquetWriter<T> {
 
   public ThriftParquetWriter(Path file, Class<T> thriftClass, CompressionCodecName compressionCodecName) throws IOException {
-    super(file, new ThriftWriteSupport<T>(thriftClass), compressionCodecName, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
+    super(file, new TBaseWriteSupport<T>(thriftClass), compressionCodecName, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
   }
 
   public ThriftParquetWriter(Path file, Class<T> thriftClass, CompressionCodecName compressionCodecName, int blockSize, int pageSize, boolean enableDictionary, boolean validating) throws IOException {
-    super(file, new ThriftWriteSupport<T>(thriftClass), compressionCodecName, blockSize, pageSize, enableDictionary, validating);
+    super(file, new TBaseWriteSupport<T>(thriftClass), compressionCodecName, blockSize, pageSize, enableDictionary, validating);
   }
 
   public ThriftParquetWriter(Path file, Class<T> thriftClass, CompressionCodecName compressionCodecName, int blockSize, int pageSize, boolean enableDictionary, boolean validating, Configuration conf) throws IOException {
-    super(file, new ThriftWriteSupport<T>(thriftClass), compressionCodecName,
+    super(file, new TBaseWriteSupport<T>(thriftClass), compressionCodecName,
         blockSize, pageSize, pageSize, enableDictionary, validating,
         DEFAULT_WRITER_VERSION, conf);
   }