You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:43 UTC

[28/50] [abbrv] beam git commit: Implements HCatalogIO

Implements HCatalogIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/349898c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/349898c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/349898c4

Branch: refs/heads/gearpump-runner
Commit: 349898c4702fc3e52d8c0cd1c5a04f14cd40fd27
Parents: 7c5a70e
Author: Seshadri Chakkravarthy <se...@gmail.com>
Authored: Thu May 18 12:07:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jun 8 14:05:38 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hcatalog/pom.xml                   | 163 ++++++
 .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 511 +++++++++++++++++++
 .../beam/sdk/io/hcatalog/package-info.java      |  22 +
 .../io/hcatalog/EmbeddedMetastoreService.java   |  88 ++++
 .../beam/sdk/io/hcatalog/HCatalogIOTest.java    | 271 ++++++++++
 .../sdk/io/hcatalog/HCatalogIOTestUtils.java    | 106 ++++
 .../hcatalog/src/test/resources/hive-site.xml   | 301 +++++++++++
 sdks/java/io/pom.xml                            |   1 +
 8 files changed, 1463 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/pom.xml b/sdks/java/io/hcatalog/pom.xml
new file mode 100644
index 0000000..19b62a5
--- /dev/null
+++ b/sdks/java/io/hcatalog/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-hcatalog</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: HCatalog</name>
+  <description>IO to read and write for HCatalog source.</description>
+
+  <properties>
+    <hive.version>2.1.0</hive.version>
+    <apache.commons.version>2.5</apache.commons.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${apache.commons.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <version>${hive.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
new file mode 100644
index 0000000..07b56e3
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
+import org.apache.hive.hcatalog.data.transfer.HCatWriter;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.hive.hcatalog.data.transfer.WriteEntity;
+import org.apache.hive.hcatalog.data.transfer.WriterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data using HCatalog.
+ *
+ * <h3>Reading using HCatalog</h3>
+ *
+ * <p>HCatalog source supports reading of HCatRecord from a HCatalog managed source, for eg. Hive.
+ *
+ * <p>To configure a HCatalog source, you must specify a metastore URI and a table name. Other
+ * optional parameters are database &amp; filter For instance:
+ *
+ * <pre>{@code
+ * Map<String, String> configProperties = new HashMap<String, String>();
+ * configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
+ *
+ * pipeline
+ *   .apply(HCatalogIO.read()
+ *       .withConfigProperties(configProperties) //mandatory
+ *       .withTable("employee") //mandatory
+ *       .withDatabase("default") //optional, assumes default if none specified
+ *       .withFilter(filterString) //optional,
+ *       should be specified if the table is partitioned
+ * }</pre>
+ *
+ * <h3>Writing using HCatalog</h3>
+ *
+ * <p>HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.
+ *
+ * <p>To configure a HCatalog sink, you must specify a metastore URI and a table name. Other
+ * optional parameters are database, partition &amp; batchsize The destination table should exist
+ * beforehand, the transform does not create a new table if it does not exist For instance:
+ *
+ * <pre>{@code
+ * Map<String, String> configProperties = new HashMap<String, String>();
+ * configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
+ *
+ * pipeline
+ *   .apply(...)
+ *   .apply(HiveIO.write()
+ *       .withConfigProperties(configProperties) //mandatory
+ *       .withTable("employee") //mandatory
+ *       .withDatabase("default") //optional, assumes default if none specified
+ *       .withFilter(partitionValues) //optional,
+ *       should be specified if the table is partitioned
+ *       .withBatchSize(1024L)) //optional,
+ *       assumes a default batch size of 1024 if none specified
+ * }</pre>
+ */
+@Experimental
+public class HCatalogIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class);
+
+  /** Write data to Hive. */
+  public static Write write() {
+    return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(1024L).build();
+  }
+
+  /** Read data from Hive. */
+  public static Read read() {
+    return new AutoValue_HCatalogIO_Read.Builder().setDatabase("default").build();
+  }
+
+  private HCatalogIO() {}
+
+  /** A {@link PTransform} to read data using HCatalog. */
+  @VisibleForTesting
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> {
+    @Nullable
+    abstract Map<String, String> getConfigProperties();
+
+    @Nullable
+    abstract String getDatabase();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getFilter();
+
+    @Nullable
+    abstract ReaderContext getContext();
+
+    @Nullable
+    abstract Integer getSplitId();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfigProperties(Map<String, String> configProperties);
+
+      abstract Builder setDatabase(String database);
+
+      abstract Builder setTable(String table);
+
+      abstract Builder setFilter(String filter);
+
+      abstract Builder setSplitId(Integer splitId);
+
+      abstract Builder setContext(ReaderContext context);
+
+      abstract Read build();
+    }
+
+    /** Sets the configuration properties like metastore URI. This is mandatory */
+    public Read withConfigProperties(Map<String, String> configProperties) {
+      return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
+    }
+
+    /** Sets the database name. This is optional, assumes 'default' database if none specified */
+    public Read withDatabase(String database) {
+      return toBuilder().setDatabase(database).build();
+    }
+
+    /** Sets the table name to read from. This is mandatory */
+    public Read withTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /** Sets the filter (partition) details. This is optional, assumes none if not specified */
+    public Read withFilter(String filter) {
+      return toBuilder().setFilter(filter).build();
+    }
+
+    Read withSplitId(int splitId) {
+      checkArgument(splitId >= 0, "Invalid split id-" + splitId);
+      return toBuilder().setSplitId(splitId).build();
+    }
+
+    Read withContext(ReaderContext context) {
+      return toBuilder().setContext(context).build();
+    }
+
+    @Override
+    public PCollection<HCatRecord> expand(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      checkNotNull(getTable(), "table");
+      checkNotNull(getConfigProperties(), "configProperties");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configProperties", getConfigProperties().toString()));
+      builder.add(DisplayData.item("table", getTable()));
+      builder.addIfNotNull(DisplayData.item("database", getDatabase()));
+      builder.addIfNotNull(DisplayData.item("filter", getFilter()));
+    }
+  }
+
+  /** A HCatalog {@link BoundedSource} reading {@link HCatRecord} from a given instance. */
+  @VisibleForTesting
+  static class BoundedHCatalogSource extends BoundedSource<HCatRecord> {
+    private Read spec;
+
+    BoundedHCatalogSource(Read spec) {
+      this.spec = spec;
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public Coder<HCatRecord> getDefaultOutputCoder() {
+      return (Coder) WritableCoder.of(DefaultHCatRecord.class);
+    }
+
+    @Override
+    public void validate() {
+      spec.validate(null);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      spec.populateDisplayData(builder);
+    }
+
+    @Override
+    public BoundedReader<HCatRecord> createReader(PipelineOptions options) {
+      return new BoundedHCatalogReader(this);
+    }
+
+    /**
+     * Returns the size of the table in bytes, does not take into consideration filter/partition
+     * details passed, if any.
+     */
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
+      Configuration conf = new Configuration();
+      for (Entry<String, String> entry : spec.getConfigProperties().entrySet()) {
+        conf.set(entry.getKey(), entry.getValue());
+      }
+      IMetaStoreClient client = null;
+      try {
+        HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+        client = HCatUtil.getHiveMetastoreClient(hiveConf);
+        Table table = HCatUtil.getTable(client, spec.getDatabase(), spec.getTable());
+        return StatsUtils.getFileSizeForTable(hiveConf, table);
+      } finally {
+        // IMetaStoreClient is not AutoCloseable, closing it manually
+        if (client != null) {
+          client.close();
+        }
+      }
+    }
+
+    /**
+     * Calculates the 'desired' number of splits based on desiredBundleSizeBytes which is passed as
+     * a hint to native API. Retrieves the actual splits generated by native API, which could be
+     * different from the 'desired' split count calculated using desiredBundleSizeBytes
+     */
+    @Override
+    public List<BoundedSource<HCatRecord>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      int desiredSplitCount = 1;
+      long estimatedSizeBytes = getEstimatedSizeBytes(options);
+      if (desiredBundleSizeBytes > 0 && estimatedSizeBytes > 0) {
+        desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
+      }
+      ReaderContext readerContext = getReaderContext(desiredSplitCount);
+      //process the splits returned by native API
+      //this could be different from 'desiredSplitCount' calculated above
+      LOG.info(
+          "Splitting into bundles of {} bytes: "
+              + "estimated size {}, desired split count {}, actual split count {}",
+          desiredBundleSizeBytes,
+          estimatedSizeBytes,
+          desiredSplitCount,
+          readerContext.numSplits());
+
+      List<BoundedSource<HCatRecord>> res = new ArrayList<>();
+      for (int split = 0; split < readerContext.numSplits(); split++) {
+        res.add(new BoundedHCatalogSource(spec.withContext(readerContext).withSplitId(split)));
+      }
+      return res;
+    }
+
+    private ReaderContext getReaderContext(long desiredSplitCount) throws HCatException {
+      ReadEntity entity =
+          new ReadEntity.Builder()
+              .withDatabase(spec.getDatabase())
+              .withTable(spec.getTable())
+              .withFilter(spec.getFilter())
+              .build();
+      // pass the 'desired' split count as an hint to the API
+      Map<String, String> configProps = new HashMap<>(spec.getConfigProperties());
+      configProps.put(
+          HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount));
+      return DataTransferFactory.getHCatReader(entity, configProps).prepareRead();
+    }
+
+    static class BoundedHCatalogReader extends BoundedSource.BoundedReader<HCatRecord> {
+      private final BoundedHCatalogSource source;
+      private HCatRecord current;
+      private Iterator<HCatRecord> hcatIterator;
+
+      public BoundedHCatalogReader(BoundedHCatalogSource source) {
+        this.source = source;
+      }
+
+      @Override
+      public boolean start() throws HCatException {
+        HCatReader reader =
+            DataTransferFactory.getHCatReader(source.spec.getContext(), source.spec.getSplitId());
+        hcatIterator = reader.read();
+        return advance();
+      }
+
+      @Override
+      public boolean advance() {
+        if (hcatIterator.hasNext()) {
+          current = hcatIterator.next();
+          return true;
+        } else {
+          current = null;
+          return false;
+        }
+      }
+
+      @Override
+      public BoundedHCatalogSource getCurrentSource() {
+        return source;
+      }
+
+      @Override
+      public HCatRecord getCurrent() {
+        if (current == null) {
+          throw new NoSuchElementException("Current element is null");
+        }
+        return current;
+      }
+
+      @Override
+      public void close() {
+        // nothing to close/release
+      }
+    }
+  }
+
+  /** A {@link PTransform} to write to a HCatalog managed source. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<HCatRecord>, PDone> {
+    @Nullable
+    abstract Map<String, String> getConfigProperties();
+
+    @Nullable
+    abstract String getDatabase();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract Map getFilter();
+
+    abstract long getBatchSize();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfigProperties(Map<String, String> configProperties);
+
+      abstract Builder setDatabase(String database);
+
+      abstract Builder setTable(String table);
+
+      abstract Builder setFilter(Map partition);
+
+      abstract Builder setBatchSize(long batchSize);
+
+      abstract Write build();
+    }
+
+    /** Sets the configuration properties like metastore URI. This is mandatory */
+    public Write withConfigProperties(Map<String, String> configProperties) {
+      return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
+    }
+
+    /** Sets the database name. This is optional, assumes 'default' database if none specified */
+    public Write withDatabase(String database) {
+      return toBuilder().setDatabase(database).build();
+    }
+
+    /** Sets the table name to write to, the table should exist beforehand. This is mandatory */
+    public Write withTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /** Sets the filter (partition) details. This is required if the table is partitioned */
+    public Write withFilter(Map filter) {
+      return toBuilder().setFilter(filter).build();
+    }
+
+    /**
+     * Sets batch size for the write operation. This is optional, assumes a default batch size of
+     * 1024 if not set
+     */
+    public Write withBatchSize(long batchSize) {
+      return toBuilder().setBatchSize(batchSize).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<HCatRecord> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      checkNotNull(getConfigProperties(), "configProperties");
+      checkNotNull(getTable(), "table");
+    }
+
+    private static class WriteFn extends DoFn<HCatRecord, Void> {
+      private final Write spec;
+      private WriterContext writerContext;
+      private HCatWriter slaveWriter;
+      private HCatWriter masterWriter;
+      private List<HCatRecord> hCatRecordsBatch;
+
+      public WriteFn(Write spec) {
+        this.spec = spec;
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.addIfNotNull(DisplayData.item("database", spec.getDatabase()));
+        builder.add(DisplayData.item("table", spec.getTable()));
+        builder.addIfNotNull(DisplayData.item("filter", String.valueOf(spec.getFilter())));
+        builder.add(DisplayData.item("configProperties", spec.getConfigProperties().toString()));
+        builder.add(DisplayData.item("batchSize", spec.getBatchSize()));
+      }
+
+      @Setup
+      public void initiateWrite() throws HCatException {
+        WriteEntity entity =
+            new WriteEntity.Builder()
+                .withDatabase(spec.getDatabase())
+                .withTable(spec.getTable())
+                .withPartition(spec.getFilter())
+                .build();
+        masterWriter = DataTransferFactory.getHCatWriter(entity, spec.getConfigProperties());
+        writerContext = masterWriter.prepareWrite();
+        slaveWriter = DataTransferFactory.getHCatWriter(writerContext);
+      }
+
+      @StartBundle
+      public void startBundle() {
+        hCatRecordsBatch = new ArrayList<>();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext ctx) throws HCatException {
+        hCatRecordsBatch.add(ctx.element());
+        if (hCatRecordsBatch.size() >= spec.getBatchSize()) {
+          flush();
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws HCatException {
+        flush();
+      }
+
+      private void flush() throws HCatException {
+        if (hCatRecordsBatch.isEmpty()) {
+          return;
+        }
+        try {
+          slaveWriter.write(hCatRecordsBatch.iterator());
+          masterWriter.commit(writerContext);
+        } catch (HCatException e) {
+          LOG.error("Exception in flush - write/commit data to Hive", e);
+          //abort on exception
+          masterWriter.abort(writerContext);
+          throw e;
+        } finally {
+          hCatRecordsBatch.clear();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java
new file mode 100644
index 0000000..dff5bd1
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing using HCatalog.
+ */
+package org.apache.beam.sdk.io.hcatalog;

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
new file mode 100644
index 0000000..5792bf6
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import static org.apache.hive.hcatalog.common.HCatUtil.makePathASafeFileName;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * Implementation of a light-weight embedded metastore. This class is a trimmed-down version of <a
+ * href="https://github.com/apache/hive/blob/master
+ * /hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java">
+ * https://github.com/apache/hive/blob/master/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce
+ * /HCatBaseTest.java </a>
+ */
+public final class EmbeddedMetastoreService implements AutoCloseable {
+  private final Driver driver;
+  private final HiveConf hiveConf;
+  private final SessionState sessionState;
+
+  EmbeddedMetastoreService(String baseDirPath) throws IOException {
+    FileUtils.forceDeleteOnExit(new File(baseDirPath));
+
+    String hiveDirPath = makePathASafeFileName(baseDirPath + "/hive");
+    String testDataDirPath =
+        makePathASafeFileName(
+            hiveDirPath
+                + "/data/"
+                + EmbeddedMetastoreService.class.getCanonicalName()
+                + System.currentTimeMillis());
+    String testWarehouseDirPath = makePathASafeFileName(testDataDirPath + "/warehouse");
+
+    hiveConf = new HiveConf(getClass());
+    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, testWarehouseDirPath);
+    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
+    hiveConf.setVar(
+        HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd."
+            + "SQLStdHiveAuthorizerFactory");
+    hiveConf.set("test.tmp.dir", hiveDirPath);
+
+    System.setProperty("derby.stream.error.file", "/dev/null");
+    driver = new Driver(hiveConf);
+    sessionState = SessionState.start(new CliSessionState(hiveConf));
+  }
+
+  /** Executes the passed query on the embedded metastore service. */
+  void executeQuery(String query) throws CommandNeedRetryException {
+    driver.run(query);
+  }
+
+  /** Returns the HiveConf object for the embedded metastore. */
+  HiveConf getHiveConf() {
+    return hiveConf;
+  }
+
+  @Override
+  public void close() throws Exception {
+    driver.close();
+    sessionState.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
new file mode 100644
index 0000000..49c538f
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE_NAME;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getConfigPropertiesAsMap;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getExpectedRecords;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getHCatRecords;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getReaderContext;
+import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.insertTestData;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.BoundedHCatalogSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/** Test for HCatalogIO. */
+public class HCatalogIOTest implements Serializable {
+  public static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+  @Rule public final transient TestPipeline defaultPipeline = TestPipeline.create();
+
+  @Rule public final transient TestPipeline readAfterWritePipeline = TestPipeline.create();
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public final transient TestRule testDataSetupRule =
+      new TestWatcher() {
+        public Statement apply(final Statement base, final Description description) {
+          return new Statement() {
+            @Override
+            public void evaluate() throws Throwable {
+              if (description.getAnnotation(NeedsTestData.class) != null) {
+                prepareTestData();
+              } else if (description.getAnnotation(NeedsEmptyTestTables.class) != null) {
+                reCreateTestTable();
+              }
+              base.evaluate();
+            }
+          };
+        }
+      };
+
+  private static EmbeddedMetastoreService service;
+
+  /** Use this annotation to setup complete test data(table populated with records). */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  @interface NeedsTestData {}
+
+  /** Use this annotation to setup test tables alone(empty tables, no records are populated). */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  @interface NeedsEmptyTestTables {}
+
+  @BeforeClass
+  public static void setupEmbeddedMetastoreService () throws IOException {
+    service = new EmbeddedMetastoreService(TMP_FOLDER.getRoot().getAbsolutePath());
+  }
+
+  @AfterClass
+  public static void shutdownEmbeddedMetastoreService () throws Exception {
+    service.executeQuery("drop table " + TEST_TABLE_NAME);
+    service.close();
+  }
+
+  /** Perform end-to-end test of Write-then-Read operation. */
+  @Test
+  @NeedsEmptyTestTables
+  public void testWriteThenReadSuccess() throws Exception {
+    defaultPipeline
+        .apply(Create.of(getHCatRecords(TEST_RECORDS_COUNT)))
+        .apply(
+            HCatalogIO.write()
+                .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+                .withTable(TEST_TABLE_NAME));
+    defaultPipeline.run();
+
+    PCollection<String> output =
+        readAfterWritePipeline
+            .apply(
+                HCatalogIO.read()
+                    .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+                    .withTable(HCatalogIOTestUtils.TEST_TABLE_NAME))
+            .apply(
+                ParDo.of(
+                    new DoFn<HCatRecord, String>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        c.output(c.element().get(0).toString());
+                      }
+                    }));
+    PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT));
+    readAfterWritePipeline.run();
+  }
+
+  /** Test of Write to a non-existent table. */
+  @Test
+  public void testWriteFailureTableDoesNotExist() throws Exception {
+    thrown.expectCause(isA(UserCodeException.class));
+    thrown.expectMessage(containsString("org.apache.hive.hcatalog.common.HCatException"));
+    thrown.expectMessage(containsString("NoSuchObjectException"));
+    defaultPipeline
+        .apply(Create.of(getHCatRecords(TEST_RECORDS_COUNT)))
+        .apply(
+            HCatalogIO.write()
+                .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+                .withTable("myowntable"));
+    defaultPipeline.run();
+  }
+
+  /** Test of Write without specifying a table. */
+  @Test
+  public void testWriteFailureValidationTable() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage(containsString("table"));
+    HCatalogIO.write()
+        .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+        .validate(null);
+  }
+
+  /** Test of Write without specifying configuration properties. */
+  @Test
+  public void testWriteFailureValidationConfigProp() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage(containsString("configProperties"));
+    HCatalogIO.write().withTable("myowntable").validate(null);
+  }
+
+  /** Test of Read from a non-existent table. */
+  @Test
+  public void testReadFailureTableDoesNotExist() throws Exception {
+    defaultPipeline.apply(
+        HCatalogIO.read()
+            .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+            .withTable("myowntable"));
+    thrown.expectCause(isA(NoSuchObjectException.class));
+    defaultPipeline.run();
+  }
+
+  /** Test of Read without specifying configuration properties. */
+  @Test
+  public void testReadFailureValidationConfig() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage(containsString("configProperties"));
+    HCatalogIO.read().withTable("myowntable").validate(null);
+  }
+
+  /** Test of Read without specifying a table. */
+  @Test
+  public void testReadFailureValidationTable() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage(containsString("table"));
+    HCatalogIO.read()
+        .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+        .validate(null);
+  }
+
+  /** Test of Read using SourceTestUtils.readFromSource(..). */
+  @Test
+  @NeedsTestData
+  public void testReadFromSource() throws Exception {
+    ReaderContext context = getReaderContext(getConfigPropertiesAsMap(service.getHiveConf()));
+    HCatalogIO.Read spec =
+        HCatalogIO.read()
+            .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+            .withContext(context)
+            .withTable(TEST_TABLE_NAME);
+
+    List<String> records = new ArrayList<>();
+    for (int i = 0; i < context.numSplits(); i++) {
+      BoundedHCatalogSource source = new BoundedHCatalogSource(spec.withSplitId(i));
+      for (HCatRecord record : SourceTestUtils.readFromSource(source, OPTIONS)) {
+        records.add(record.get(0).toString());
+      }
+    }
+    assertThat(records, containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT).toArray()));
+  }
+
+  /** Test of Read using SourceTestUtils.assertSourcesEqualReferenceSource(..). */
+  @Test
+  @NeedsTestData
+  public void testSourceEqualsSplits() throws Exception {
+    final int numRows = 1500;
+    final int numSamples = 10;
+    final long bytesPerRow = 15;
+    ReaderContext context = getReaderContext(getConfigPropertiesAsMap(service.getHiveConf()));
+    HCatalogIO.Read spec =
+        HCatalogIO.read()
+            .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+            .withContext(context)
+            .withTable(TEST_TABLE_NAME);
+
+    BoundedHCatalogSource source = new BoundedHCatalogSource(spec);
+    List<BoundedSource<HCatRecord>> unSplitSource = source.split(-1, OPTIONS);
+    assertEquals(1, unSplitSource.size());
+
+    List<BoundedSource<HCatRecord>> splits =
+        source.split(numRows * bytesPerRow / numSamples, OPTIONS);
+    assertTrue(splits.size() >= 1);
+
+    SourceTestUtils.assertSourcesEqualReferenceSource(unSplitSource.get(0), splits, OPTIONS);
+  }
+
+  private void reCreateTestTable() throws CommandNeedRetryException {
+    service.executeQuery("drop table " + TEST_TABLE_NAME);
+    service.executeQuery("create table " + TEST_TABLE_NAME + "(mycol1 string, mycol2 int)");
+  }
+
+  private void prepareTestData() throws Exception {
+    reCreateTestTable();
+    insertTestData(getConfigPropertiesAsMap(service.getHiveConf()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
new file mode 100644
index 0000000..f66e0bc
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.hive.hcatalog.data.transfer.WriteEntity;
+import org.apache.hive.hcatalog.data.transfer.WriterContext;
+
+/** Utility class for HCatalogIOTest. */
+public class HCatalogIOTestUtils {
+  public static final String TEST_TABLE_NAME = "mytable";
+
+  public static final int TEST_RECORDS_COUNT = 1000;
+
+  private static final ReadEntity READ_ENTITY =
+      new ReadEntity.Builder().withTable(TEST_TABLE_NAME).build();
+  private static final WriteEntity WRITE_ENTITY =
+      new WriteEntity.Builder().withTable(TEST_TABLE_NAME).build();
+
+  /** Returns a ReaderContext instance for the passed datastore config params. */
+  static ReaderContext getReaderContext(Map<String, String> config) throws HCatException {
+    return DataTransferFactory.getHCatReader(READ_ENTITY, config).prepareRead();
+  }
+
+  /** Returns a WriterContext instance for the passed datastore config params. */
+  static WriterContext getWriterContext(Map<String, String> config) throws HCatException {
+    return DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).prepareWrite();
+  }
+
+  /** Writes records to the table using the passed WriterContext. */
+  static void writeRecords(WriterContext context) throws HCatException {
+    DataTransferFactory.getHCatWriter(context).write(getHCatRecords(TEST_RECORDS_COUNT).iterator());
+  }
+
+  /** Commits the pending writes to the database. */
+  static void commitRecords(Map<String, String> config, WriterContext context) throws IOException {
+    DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).commit(context);
+  }
+
+  /** Returns a list of strings containing 'expected' test data for verification. */
+  static List<String> getExpectedRecords(int count) {
+    List<String> expected = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      expected.add("record " + i);
+    }
+    return expected;
+  }
+
+  /** Returns a list of HCatRecords of passed size. */
+  static List<HCatRecord> getHCatRecords(int size) {
+    List<HCatRecord> expected = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      expected.add(toHCatRecord(i));
+    }
+    return expected;
+  }
+
+  /** Inserts data into test datastore. */
+  static void insertTestData(Map<String, String> configMap) throws Exception {
+    WriterContext cntxt = getWriterContext(configMap);
+    writeRecords(cntxt);
+    commitRecords(configMap, cntxt);
+  }
+
+  /** Returns config params for the test datastore as a Map. */
+  static Map<String, String> getConfigPropertiesAsMap(HiveConf hiveConf) {
+    Map<String, String> map = new HashMap<>();
+    for (Entry<String, String> kv : hiveConf) {
+      map.put(kv.getKey(), kv.getValue());
+    }
+    return map;
+  }
+
+  /** returns a DefaultHCatRecord instance for passed value. */
+  static DefaultHCatRecord toHCatRecord(int value) {
+    return new DefaultHCatRecord(Arrays.<Object>asList("record " + value, value));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/hcatalog/src/test/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/resources/hive-site.xml b/sdks/java/io/hcatalog/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..5bb1496
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/test/resources/hive-site.xml
@@ -0,0 +1,301 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<!-- This file is a copy of https://github.com/apache/hive/blob/master/data/conf/hive-site.xml used to support embedded Hive metastore-->
+<configuration>
+
+<property>
+  <name>hive.in.test</name>
+  <value>true</value>
+  <description>Internal marker for test. Used for masking env-dependent values</description>
+</property>
+
+<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
+<!-- that are implied by Hadoop setup variables.                                                -->
+<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
+<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
+<!-- resource).                                                                                 -->
+
+<!-- Hive Execution Parameters -->
+<property>
+  <name>hadoop.tmp.dir</name>
+  <value>${test.tmp.dir}/hadoop-tmp</value>
+  <description>A base for other temporary directories.</description>
+</property>
+
+<!--
+<property>
+  <name>hive.exec.reducers.max</name>
+  <value>1</value>
+  <description>maximum number of reducers</description>
+</property>
+-->
+
+<property>
+  <name>hive.exec.scratchdir</name>
+  <value>${test.tmp.dir}/scratchdir</value>
+  <description>Scratch space for Hive jobs</description>
+</property>
+
+<property>
+  <name>hive.exec.local.scratchdir</name>
+  <value>${test.tmp.dir}/localscratchdir/</value>
+  <description>Local scratch space for Hive jobs</description>
+</property>
+
+<property>
+  <name>datanucleus.schema.autoCreateAll</name>
+  <value>true</value>
+</property>
+
+<property>
+  <name>javax.jdo.option.ConnectionURL</name>
+  <value>jdbc:derby:;databaseName=${test.tmp.dir}/junit_metastore_db;create=true</value>
+</property>
+
+<property>
+  <name>javax.jdo.option.ConnectionDriverName</name>
+  <value>org.apache.derby.jdbc.EmbeddedDriver</value>
+</property>
+
+<property>
+  <name>javax.jdo.option.ConnectionUserName</name>
+  <value>APP</value>
+</property>
+
+<property>
+  <name>javax.jdo.option.ConnectionPassword</name>
+  <value>mine</value>
+</property>
+
+<property>
+  <!--  this should eventually be deprecated since the metastore should supply this -->
+  <name>hive.metastore.warehouse.dir</name>
+  <value>${test.warehouse.dir}</value>
+  <description></description>
+</property>
+
+<property>
+  <name>hive.metastore.metadb.dir</name>
+  <value>file://${test.tmp.dir}/metadb/</value>
+  <description>
+  Required by metastore server or if the uris argument below is not supplied
+  </description>
+</property>
+
+<property>
+  <name>test.log.dir</name>
+  <value>${test.tmp.dir}/log/</value>
+  <description></description>
+</property>
+
+<property>
+  <name>test.data.files</name>
+  <value>${hive.root}/data/files</value>
+  <description></description>
+</property>
+
+<property>
+  <name>test.data.scripts</name>
+  <value>${hive.root}/data/scripts</value>
+  <description></description>
+</property>
+
+<property>
+  <name>hive.jar.path</name>
+  <value>${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar</value>
+  <description></description>
+</property>
+
+<property>
+  <name>hive.metastore.rawstore.impl</name>
+  <value>org.apache.hadoop.hive.metastore.ObjectStore</value>
+  <description>Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database</description>
+</property>
+
+<property>
+  <name>hive.querylog.location</name>
+  <value>${test.tmp.dir}/tmp</value>
+  <description>Location of the structured hive logs</description>
+</property>
+
+<property>
+  <name>hive.exec.pre.hooks</name>
+  <value>org.apache.hadoop.hive.ql.hooks.PreExecutePrinter, org.apache.hadoop.hive.ql.hooks.EnforceReadOnlyTables</value>
+  <description>Pre Execute Hook for Tests</description>
+</property>
+
+<property>
+  <name>hive.exec.post.hooks</name>
+  <value>org.apache.hadoop.hive.ql.hooks.PostExecutePrinter</value>
+  <description>Post Execute Hook for Tests</description>
+</property>
+
+<property>
+  <name>hive.support.concurrency</name>
+  <value>true</value>
+  <description>Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.</description>
+</property>
+
+<property>
+  <key>hive.unlock.numretries</key>
+  <value>2</value>
+  <description>The number of times you want to retry to do one unlock</description>
+</property>
+
+<property>
+  <key>hive.lock.sleep.between.retries</key>
+  <value>2</value>
+  <description>The sleep time (in seconds) between various retries</description>
+</property>
+
+
+<property>
+  <name>fs.pfile.impl</name>
+  <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
+  <description>A proxy for local file system used for cross file system testing</description>
+</property>
+
+<property>
+  <name>hive.exec.mode.local.auto</name>
+  <value>false</value>
+  <description>
+    Let hive determine whether to run in local mode automatically
+    Disabling this for tests so that minimr is not affected
+  </description>
+</property>
+
+<property>
+  <name>hive.auto.convert.join</name>
+  <value>false</value>
+  <description>Whether Hive enable the optimization about converting common join into mapjoin based on the input file size</description>
+</property>
+
+<property>
+  <name>hive.ignore.mapjoin.hint</name>
+  <value>false</value>
+  <description>Whether Hive ignores the mapjoin hint</description>
+</property>
+
+<property>
+  <name>hive.input.format</name>
+  <value>org.apache.hadoop.hive.ql.io.CombineHiveInputFormat</value>
+  <description>The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat. </description>
+</property>
+
+<property>
+  <name>hive.default.rcfile.serde</name>
+  <value>org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe</value>
+  <description>The default SerDe hive will use for the rcfile format</description>
+</property>
+
+<property>
+  <name>hive.stats.key.prefix.reserve.length</name>
+  <value>0</value>
+</property>
+
+<property>
+  <name>hive.conf.restricted.list</name>
+  <value>dummy.config.value</value>
+  <description>Using dummy config value above because you cannot override config with empty value</description>
+</property>
+
+<property>
+  <name>hive.exec.submit.local.task.via.child</name>
+  <value>false</value>
+</property>
+
+
+<property>
+  <name>hive.dummyparam.test.server.specific.config.override</name>
+  <value>from.hive-site.xml</value>
+  <description>Using dummy param to test server specific configuration</description>
+</property>
+
+<property>
+  <name>hive.dummyparam.test.server.specific.config.hivesite</name>
+  <value>from.hive-site.xml</value>
+  <description>Using dummy param to test server specific configuration</description>
+</property>
+
+<property>
+  <name>test.var.hiveconf.property</name>
+  <value>${hive.exec.default.partition.name}</value>
+  <description>Test hiveconf property substitution</description>
+</property>
+
+<property>
+  <name>test.property1</name>
+  <value>value1</value>
+  <description>Test property defined in hive-site.xml only</description>
+</property>
+
+<property>
+  <name>hive.test.dummystats.aggregator</name>
+  <value>value2</value>
+</property>
+
+<property>
+  <name>hive.fetch.task.conversion</name>
+  <value>minimal</value>
+</property>
+
+<property>
+  <name>hive.users.in.admin.role</name>
+  <value>hive_admin_user</value>
+</property>
+
+<property>
+  <name>hive.llap.io.cache.orc.size</name>
+  <value>8388608</value>
+</property>
+
+<property>
+  <name>hive.llap.io.cache.orc.arena.size</name>
+  <value>8388608</value>
+</property>
+
+<property>
+  <name>hive.llap.io.cache.orc.alloc.max</name>
+  <value>2097152</value>
+</property>
+
+
+<property>
+  <name>hive.llap.io.cache.orc.alloc.min</name>
+  <value>32768</value>
+</property>
+
+<property>
+  <name>hive.llap.cache.allow.synthetic.fileid</name>
+  <value>true</value>
+</property>
+
+<property>
+  <name>hive.llap.io.use.lrfu</name>
+  <value>true</value>
+</property>
+
+
+<property>
+  <name>hive.llap.io.allocator.direct</name>
+  <value>false</value>
+</property>
+
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/beam/blob/349898c4/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 44f3baa..13cd418 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -72,6 +72,7 @@
     <module>hadoop-file-system</module>
     <module>hadoop</module>
     <module>hbase</module>
+    <module>hcatalog</module>
     <module>jdbc</module>
     <module>jms</module>
     <module>kafka</module>