You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/06/14 20:29:24 UTC
[beam] branch master updated: [BEAM-7450] Support unbounded reads
with HCatalogIO
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9bbe6f5 [BEAM-7450] Support unbounded reads with HCatalogIO
new 19804ac Merge pull request #8718: [BEAM-7450] Support unbounded reads with HCatalogIO
9bbe6f5 is described below
commit 9bbe6f523aac427958a4e99e4d729a80b105e63d
Author: Ankit Jhalaria <aj...@godaddy.com>
AuthorDate: Wed May 29 12:33:01 2019 -0700
[BEAM-7450] Support unbounded reads with HCatalogIO
---
.../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 91 +++++++++++++++--
.../apache/beam/sdk/io/hcatalog/HCatalogUtils.java | 87 ++++++++++++++++
.../beam/sdk/io/hcatalog/PartitionPollerFn.java | 56 +++++++++++
.../beam/sdk/io/hcatalog/PartitionReaderFn.java | 111 +++++++++++++++++++++
.../beam/sdk/io/hcatalog/HCatalogIOTest.java | 80 +++++++++++++++
5 files changed, 414 insertions(+), 11 deletions(-)
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 73518f6..05b43c6 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -33,15 +32,17 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -58,6 +59,7 @@ import org.apache.hive.hcatalog.data.transfer.ReadEntity;
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
import org.apache.hive.hcatalog.data.transfer.WriteEntity;
import org.apache.hive.hcatalog.data.transfer.WriterContext;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +85,20 @@ import org.slf4j.LoggerFactory;
* .withFilter(filterString) //optional, may be specified if the table is partitioned
* }</pre>
*
+ * <p>HCatalog source supports reading of HCatRecord in an unbounded mode. When run in an unbounded
+ * mode, HCatalogIO will continuously poll for new partitions and read that data. If provided with a
+ * termination condition, it will stop reading data after the condition is met.
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(HCatalogIO.read()
+ * .withConfigProperties(configProperties)
+ * .withDatabase("default") //optional, assumes default if none specified
+ * .withTable("employee")
+ * .withPollingInterval(Duration.millis(15000)) // poll for new partitions every 15 seconds
+ * .withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) //optional
+ * }</pre>
+ *
* <h3>Writing using HCatalog</h3>
*
* <p>HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.
@@ -120,7 +136,10 @@ public class HCatalogIO {
/** Read data from Hive. */
public static Read read() {
- return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
+ return new AutoValue_HCatalogIO_Read.Builder()
+ .setDatabase(DEFAULT_DATABASE)
+ .setPartitionCols(new ArrayList<>())
+ .build();
}
private HCatalogIO() {}
@@ -129,6 +148,7 @@ public class HCatalogIO {
@VisibleForTesting
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> {
+
@Nullable
abstract Map<String, String> getConfigProperties();
@@ -147,6 +167,15 @@ public class HCatalogIO {
@Nullable
abstract Integer getSplitId();
+ @Nullable
+ abstract Duration getPollingInterval();
+
+ @Nullable
+ abstract List<String> getPartitionCols();
+
+ @Nullable
+ abstract TerminationCondition<Read, ?> getTerminationCondition();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -163,6 +192,12 @@ public class HCatalogIO {
abstract Builder setContext(ReaderContext context);
+ abstract Builder setPollingInterval(Duration pollingInterval);
+
+ abstract Builder setPartitionCols(List<String> partitionCols);
+
+ abstract Builder setTerminationCondition(TerminationCondition<Read, ?> terminationCondition);
+
abstract Read build();
}
@@ -186,6 +221,28 @@ public class HCatalogIO {
return toBuilder().setFilter(filter).build();
}
+ /**
+ * If specified, polling for new partitions will happen at this periodicity. The returned
+ * PCollection will be unbounded. However if a withTerminationCondition is set along with
+ * pollingInterval, polling will stop after the termination condition has been met.
+ */
+ public Read withPollingInterval(Duration pollingInterval) {
+ return toBuilder().setPollingInterval(pollingInterval).build();
+ }
+
+ /** Set the names of the columns that are partitions. */
+ public Read withPartitionCols(List<String> partitionCols) {
+ return toBuilder().setPartitionCols(partitionCols).build();
+ }
+
+ /**
+ * If specified, the poll function will stop polling after the termination condition has been
+ * satisfied.
+ */
+ public Read withTerminationCondition(TerminationCondition<Read, ?> terminationCondition) {
+ return toBuilder().setTerminationCondition(terminationCondition).build();
+ }
+
Read withSplitId(int splitId) {
checkArgument(splitId >= 0, "Invalid split id-" + splitId);
return toBuilder().setSplitId(splitId).build();
@@ -196,11 +253,27 @@ public class HCatalogIO {
}
@Override
+ @SuppressWarnings("deprecation")
public PCollection<HCatRecord> expand(PBegin input) {
checkArgument(getTable() != null, "withTable() is required");
checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
-
- return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
+ Watch.Growth<Read, Integer, Integer> growthFn;
+ if (getPollingInterval() != null) {
+ growthFn = Watch.growthOf(new PartitionPollerFn()).withPollInterval(getPollingInterval());
+ if (getTerminationCondition() != null) {
+ growthFn = growthFn.withTerminationPerInput(getTerminationCondition());
+ }
+ return input
+ .apply("ConvertToReadRequest", Create.of(this))
+ .apply("WatchForNewPartitions", growthFn)
+ .apply("PartitionReader", ParDo.of(new PartitionReaderFn(getConfigProperties())));
+ } else {
+ // Treat as Bounded
+ checkArgument(
+ getTerminationCondition() == null,
+ "withTerminationCondition() is not required when using in bounded reads mode");
+ return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
+ }
}
@Override
@@ -244,14 +317,10 @@ public class HCatalogIO {
*/
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
- Configuration conf = new Configuration();
- for (Entry<String, String> entry : spec.getConfigProperties().entrySet()) {
- conf.set(entry.getKey(), entry.getValue());
- }
IMetaStoreClient client = null;
try {
- HiveConf hiveConf = HCatUtil.getHiveConf(conf);
- client = HCatUtil.getHiveMetastoreClient(hiveConf);
+ HiveConf hiveConf = HCatalogUtils.createHiveConf(spec);
+ client = HCatalogUtils.createMetaStoreClient(hiveConf);
Table table = HCatUtil.getTable(client, spec.getDatabase(), spec.getTable());
return StatsUtils.getFileSizeForTable(hiveConf, table);
} finally {
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java
new file mode 100644
index 0000000..bf3638e
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
+
+/** Utility classes to enable meta store conf/client creation. */
+public class HCatalogUtils {
+
+ private static final int DESIRED_BUNDLE_SIZE_BYTES = 134217728; // 128 MB
+
+ static IMetaStoreClient createMetaStoreClient(Configuration conf)
+ throws IOException, MetaException {
+ final HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+ return HCatUtil.getHiveMetastoreClient(hiveConf);
+ }
+
+ static HiveConf createHiveConf(Read readRequest) throws IOException {
+ Configuration conf = createConfiguration(readRequest.getConfigProperties());
+ return HCatUtil.getHiveConf(conf);
+ }
+
+ static int getSplitCount(Read readRequest, Partition partitionToRead) throws Exception {
+ int desiredSplitCount = 1;
+ long estimatedSizeBytes = getFileSizeForPartition(readRequest, partitionToRead);
+ if (estimatedSizeBytes > 0) {
+ desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / DESIRED_BUNDLE_SIZE_BYTES);
+ }
+ return desiredSplitCount;
+ }
+
+ static Configuration createConfiguration(Map<String, String> configProperties) {
+ Configuration conf = new Configuration();
+ for (Map.Entry<String, String> entry : configProperties.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ return conf;
+ }
+
+ private static long getFileSizeForPartition(Read readRequest, Partition partitionToRead)
+ throws Exception {
+ IMetaStoreClient client = null;
+ try {
+ HiveConf hiveConf = HCatalogUtils.createHiveConf(readRequest);
+ client = HCatalogUtils.createMetaStoreClient(hiveConf);
+ List<org.apache.hadoop.hive.ql.metadata.Partition> p = new ArrayList<>();
+ Table table = HCatUtil.getTable(client, readRequest.getDatabase(), readRequest.getTable());
+ final org.apache.hadoop.hive.ql.metadata.Partition partition =
+ new org.apache.hadoop.hive.ql.metadata.Partition(table, partitionToRead);
+ p.add(partition);
+ final List<Long> fileSizeForPartitions = StatsUtils.getFileSizeForPartitions(hiveConf, p);
+ return fileSizeForPartitions.get(0);
+ } finally {
+ // IMetaStoreClient is not AutoCloseable, closing it manually
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java
new file mode 100644
index 0000000..2e40710
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.joda.time.Instant;
+
+/** Return the list of current partitions present. */
+class PartitionPollerFn extends PollFn<Read, Integer> {
+ private transient IMetaStoreClient metaStoreClient;
+
+ @Override
+ public PollResult<Integer> apply(Read element, Context c) throws Exception {
+ final Configuration conf = HCatalogUtils.createConfiguration(element.getConfigProperties());
+ metaStoreClient = HCatalogUtils.createMetaStoreClient(conf);
+ final Instant now = Instant.now();
+ final PollResult<Integer> pollResult =
+ PollResult.incomplete(now, getPartitionIndices(element)).withWatermark(now);
+ if (metaStoreClient != null) {
+ metaStoreClient.close();
+ }
+ return pollResult;
+ }
+
+ private List<Integer> getPartitionIndices(Read read) throws Exception {
+ return IntStream.range(
+ 0,
+ metaStoreClient
+ .listPartitions(read.getDatabase(), read.getTable(), Short.MAX_VALUE)
+ .size())
+ .boxed()
+ .collect(Collectors.toList());
+ }
+}
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java
new file mode 100644
index 0000000..70d0529
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+
+/** Reads partition at a given index. */
+class PartitionReaderFn extends DoFn<KV<Read, Integer>, HCatRecord> {
+ private transient IMetaStoreClient metaStoreClient;
+ private Map<String, String> configProperties;
+
+ public PartitionReaderFn(Map<String, String> configProperties) {
+ this.configProperties = configProperties;
+ }
+
+ private ReaderContext getReaderContext(Read readRequest, Integer partitionIndexToRead)
+ throws Exception {
+ final List<Partition> partitions =
+ metaStoreClient.listPartitions(
+ readRequest.getDatabase(), readRequest.getTable(), Short.MAX_VALUE);
+ final Partition partition = partitions.get(partitionIndexToRead);
+ checkArgument(
+ partition != null, "Unable to find a partition to read at index " + partitionIndexToRead);
+
+ final int desiredSplitCount = HCatalogUtils.getSplitCount(readRequest, partition);
+ final List<String> values = partition.getValues();
+ final List<String> partitionCols = readRequest.getPartitionCols();
+ checkArgument(
+ values.size() == partitionCols.size(),
+ "Number of input partitions should be equal to the values of list partition values.");
+
+ List<String> filter = new ArrayList<>();
+ for (int i = 0; i < partitionCols.size(); i++) {
+ filter.add(partitionCols.get(i) + "=" + "'" + values.get(i) + "'");
+ }
+ final String filterString = String.join(" and ", filter);
+
+ ReadEntity entity =
+ new ReadEntity.Builder()
+ .withDatabase(readRequest.getDatabase())
+ .withFilter(filterString)
+ .withTable(readRequest.getTable())
+ .build();
+ // pass the 'desired' split count as an hint to the API
+ Map<String, String> configProps = new HashMap<>(readRequest.getConfigProperties());
+ configProps.put(
+ HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount));
+ return DataTransferFactory.getHCatReader(entity, configProps).prepareRead();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ final Read readRequest = c.element().getKey();
+ final Integer partitionIndexToRead = c.element().getValue();
+ ReaderContext readerContext = getReaderContext(readRequest, partitionIndexToRead);
+ for (int i = 0; i < readerContext.numSplits(); i++) {
+ HCatReader reader = DataTransferFactory.getHCatReader(readerContext, i);
+ Iterator<HCatRecord> hcatIterator = reader.read();
+ while (hcatIterator.hasNext()) {
+ final HCatRecord record = hcatIterator.next();
+ c.output(record);
+ }
+ }
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ final Configuration conf = HCatalogUtils.createConfiguration(configProperties);
+ metaStoreClient = HCatalogUtils.createMetaStoreClient(conf);
+ }
+
+ @Teardown
+ public void teardown() {
+ if (metaStoreClient != null) {
+ metaStoreClient.close();
+ }
+ }
+}
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
index da631a3..7f925ef 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
@@ -40,8 +40,12 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.io.hcatalog.HCatalogIO.BoundedHCatalogSource;
import org.apache.beam.sdk.io.hcatalog.test.EmbeddedMetastoreService;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -52,11 +56,16 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -96,6 +105,9 @@ public class HCatalogIOTest implements Serializable {
prepareTestData();
} else if (description.getAnnotation(NeedsEmptyTestTables.class) != null) {
reCreateTestTable();
+ } else if (description.getAnnotation(NeedsEmptyTestTablesForUnboundedReads.class)
+ != null) {
+ reCreateTestTableForUnboundedReads();
}
base.evaluate();
}
@@ -110,6 +122,11 @@ public class HCatalogIOTest implements Serializable {
@Target({ElementType.METHOD})
private @interface NeedsTestData {}
+ /** Use this annotation to setup complete test data(table populated with unbounded records). */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ElementType.METHOD})
+ private @interface NeedsEmptyTestTablesForUnboundedReads {}
+
/** Use this annotation to setup test tables alone(empty tables, no records are populated). */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@@ -163,6 +180,56 @@ public class HCatalogIOTest implements Serializable {
readAfterWritePipeline.run();
}
+ private Map<String, String> getPartitions() {
+ Map<String, String> partitions = new HashMap<>();
+ partitions.put("load_date", "2019-05-14T23:28:04.425Z");
+ partitions.put("product_type", "1");
+ return partitions;
+ }
+
+ /** Perform end-to-end test of Write-then-Read operation. */
+ @Test
+ @NeedsEmptyTestTablesForUnboundedReads
+ public void testWriteThenUnboundedReadSuccess() throws Exception {
+
+ defaultPipeline
+ .apply(Create.of(buildHCatRecords(TEST_RECORDS_COUNT)))
+ .apply(
+ HCatalogIO.write()
+ .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+ .withDatabase(TEST_DATABASE)
+ .withTable(TEST_TABLE)
+ .withPartition(getPartitions())
+ .withBatchSize(512L));
+ defaultPipeline.run();
+ final ImmutableList<String> partitions = ImmutableList.of("load_date", "product_type");
+ final PCollection<HCatRecord> data =
+ readAfterWritePipeline
+ .apply(
+ "ReadData",
+ HCatalogIO.read()
+ .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+ .withDatabase(TEST_DATABASE)
+ .withPartitionCols(partitions)
+ .withTable(TEST_TABLE)
+ .withPollingInterval(Duration.millis(15000))
+ .withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000))))
+ .setCoder((Coder) WritableCoder.of(DefaultHCatRecord.class));
+
+ final PCollection<String> output =
+ data.apply(
+ ParDo.of(
+ new DoFn<HCatRecord, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().get(0).toString());
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT));
+ readAfterWritePipeline.run();
+ }
+
/** Test of Write to a non-existent table. */
@Test
public void testWriteFailureTableDoesNotExist() {
@@ -276,6 +343,19 @@ public class HCatalogIOTest implements Serializable {
service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)");
}
+ private void reCreateTestTableForUnboundedReads() throws CommandNeedRetryException {
+ service.executeQuery("drop table " + TEST_TABLE);
+ service.executeQuery(
+ "create table "
+ + TEST_TABLE
+ + "(mycol1 string, mycol2 int) "
+ + "partitioned by (load_date string, product_type string)");
+ service.executeQuery(
+ "ALTER TABLE "
+ + TEST_TABLE
+ + " ADD PARTITION (load_date='2019-05-14T23:28:04.425Z', product_type='1')");
+ }
+
private void prepareTestData() throws Exception {
reCreateTestTable();
insertTestData(getConfigPropertiesAsMap(service.getHiveConf()));