You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2012/03/30 02:44:55 UTC
svn commit: r1307220 - in /incubator/hcatalog/branches/branch-0.4: ./
src/java/org/apache/hcatalog/data/transfer/
src/java/org/apache/hcatalog/data/transfer/impl/
src/java/org/apache/hcatalog/data/transfer/state/
src/test/e2e/hcatalog/udfs/java/org/apa...
Author: gates
Date: Fri Mar 30 02:44:54 2012
New Revision: 1307220
URL: http://svn.apache.org/viewvc?rev=1307220&view=rev
Log:
HCATALOG-287 Add data api to HCatalog
Added:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java
incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java
incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java
incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1307220&r1=1307219&r2=1307220&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Mar 30 02:44:54 2012
@@ -23,6 +23,7 @@ Release 0.4.1 - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-287 Add data api to HCatalog (hashutosh)
IMPROVEMENTS
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.data.transfer.impl.HCatInputFormatReader;
+import org.apache.hcatalog.data.transfer.impl.HCatOutputFormatWriter;
+import org.apache.hcatalog.data.transfer.state.DefaultStateProvider;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/** Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} at master and slave nodes.
+ */
+
+public class DataTransferFactory {
+
+ /**
+ * This should be called once from master node to obtain an instance of {@link HCatReader}
+ * @param re built using {@link ReadEntity.Builder}
+ * @param config Any configuration which master node wants to pass to HCatalog
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final ReadEntity re, final Map<String,String> config) {
+ // In future, this may examine ReadEntity and/or config to return appropriate HCatReader
+ return new HCatInputFormatReader(re, config);
+ }
+
+ /**
+ * This should only be called once from every slave nodes to obtain an instance of {@link HCatReader}
+ * @param split obtained at master node.
+ * @param config obtained at master node.
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final InputSplit split, final Configuration config) {
+ // In future, this may examine config to return appropriate HCatReader
+ return getHCatReader(split, config, DefaultStateProvider.get());
+ }
+
+ /**
+ * This should only be called once from every slave nodes to obtain an instance of {@link HCatReader}
+ * This should be called if external system has some state to provide to HCatalog
+ * @param split obtained at master node.
+ * @param config obtained at master node.
+ * @param sp
+ * @return {@link HCatReader}
+ */
+ public static HCatReader getHCatReader(final InputSplit split, final Configuration config, StateProvider sp) {
+ // In future, this may examine config to return appropriate HCatReader
+ return new HCatInputFormatReader(split, config, sp);
+ }
+
+ /** This should be called at master node to obtain an instance of {@link HCatWriter}
+ * @param we built using {@link WriteEntity.Builder}
+ * @param config Any configuration which master wants to pass to HCatalog
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriteEntity we, final Map<String,String> config) {
+ // In future, this may examine WriteEntity and/or config to return appropriate HCatWriter
+ return new HCatOutputFormatWriter(we, config);
+ }
+
+ /** This should be called at slave nodes to obtain an instance of {@link HCatWriter}
+ * @param info {@link WriterContext} obtained at master node.
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriterContext cntxt) {
+ // In future, this may examine context to return appropriate HCatWriter
+ return getHCatWriter(cntxt, DefaultStateProvider.get());
+ }
+
+ /** This should be called at slave nodes to obtain an instance of {@link HCatWriter}
+ * If external system has some mechanism for providing state to HCatalog, this constructor
+ * can be used.
+ * @param info {@link WriterContext} obtained at master node.
+ * @param sp {@link StateProvider}
+ * @return {@link HCatWriter}
+ */
+ public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) {
+ // In future, this may examine context to return appropriate HCatWriter
+ return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/EntityBase.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+/** This is a base class for {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. Many fields in them are common,
+ * so this class contains the common fields.
+ */
+
+abstract class EntityBase {
+
+ String region;
+ String tableName;
+ String dbName;
+ Map<String,String> partitionKVs;
+
+
+
+ /** Common methods for {@link ReadEntity} and {@link WriteEntity}
+ */
+
+ abstract static class Entity extends EntityBase{
+
+ public String getRegion() {
+ return region;
+ }
+ public String getTableName() {
+ return tableName;
+ }
+ public String getDbName() {
+ return dbName;
+ }
+ public Map<String, String> getPartitionKVs() {
+ return partitionKVs;
+ }
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatReader.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/** This abstract class is internal to HCatalog and abstracts away the notion of
+ * underlying system from which reads will be done.
+ */
+
+public abstract class HCatReader{
+
+ /** This should be called at master node to obtain {@link ReaderContext} which then should be
+ * serialized and sent to slave nodes.
+ * @return {@link ReaderContext}
+ * @throws HCatException
+ */
+ public abstract ReaderContext prepareRead() throws HCatException;
+
+ /** This should be called at slave nodes to read {@link HCatRecord}s
+ * @return {@link Iterator} of {@link HCatRecord}
+ * @throws HCatException
+ */
+ public abstract Iterator<HCatRecord> read() throws HCatException;
+
+ /** This constructor will be invoked by {@link DataTransferFactory} at master node.
+ * Don't use this constructor. Instead, use {@link DataTransferFactory}
+ * @param re
+ * @param config
+ */
+ protected HCatReader(final ReadEntity re, final Map<String,String> config) {
+ this(config);
+ this.re = re;
+ }
+
+ /** This constructor will be invoked by {@link DataTransferFactory} at slave nodes.
+ * Don't use this constructor. Instead, use {@link DataTransferFactory}
+ * @param re
+ * @param config
+ * @param sp
+ */
+
+ protected HCatReader(final Configuration config, StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ protected ReadEntity re; // This will be null at slaves.
+ protected Configuration conf;
+ protected ReaderContext info;
+ protected StateProvider sp; // This will be null at master.
+
+ private HCatReader(final Map<String,String> config) {
+ Configuration conf = new Configuration();
+ if (null != config) {
+ for(Entry<String, String> kv : config.entrySet()){
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ if (null == conf) {
+ throw new IllegalStateException("HCatReader is not constructed correctly.");
+ }
+ return conf;
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/HCatWriter.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/** This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external
+ * systems. Don't try to instantiate this directly. Instead, use {@link DataTransferFactory}
+ */
+
+public abstract class HCatWriter {
+
+ protected Configuration conf;
+ protected WriteEntity we; // This will be null at slave nodes.
+ protected WriterContext info;
+ protected StateProvider sp;
+
+ /** External system should invoke this method exactly once from a master node.
+ * @return {@link WriterContext} This should be serialized and sent to slave nodes to
+ * construct HCatWriter there.
+ * @throws HCatException
+ */
+ public abstract WriterContext prepareWrite() throws HCatException;
+
+ /** This method should be used at slave needs to perform writes.
+ * @param {@link Iterator} records to be written into HCatalog.
+ * @throws {@link HCatException}
+ */
+ public abstract void write(final Iterator<HCatRecord> recordItr) throws HCatException;
+
+ /** This method should be called at master node. Primary purpose of this is to do metadata commit.
+ * @throws {@link HCatException}
+ */
+ public abstract void commit(final WriterContext context) throws HCatException;
+
+ /** This method should be called at master node. Primary purpose of this is to do cleanups in case
+ * of failures.
+ * @throws {@link HCatException} *
+ */
+ public abstract void abort(final WriterContext context) throws HCatException;
+
+ /**
+ * This constructor will be used at master node
+ * @param we WriteEntity defines where in storage records should be written to.
+ * @param config Any configuration which external system wants to communicate to HCatalog
+ * for performing writes.
+ */
+ protected HCatWriter(final WriteEntity we, final Map<String,String> config) {
+ this(config);
+ this.we = we;
+ }
+
+ /** This constructor will be used at slave nodes.
+ * @param config
+ */
+ protected HCatWriter(final Configuration config, final StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ private HCatWriter(final Map<String,String> config) {
+ Configuration conf = new Configuration();
+ if(config != null){
+ // user is providing config, so it could be null.
+ for(Entry<String, String> kv : config.entrySet()){
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ this.conf = conf;
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReadEntity.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class ReadEntity extends EntityBase.Entity{
+
+ private String filterString;
+
+ /** Don't instantiate {@link ReadEntity} directly. Use, {@link ReadEntity.Builder} instead.
+ *
+ */
+ private ReadEntity() {
+ // Not allowed
+ }
+
+ private ReadEntity(Builder builder) {
+
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ this.filterString = builder.filterString;
+ }
+
+ public String getFilterString() {
+ return this.filterString;
+ }
+
+ /** This class should be used to build {@link ReadEntity}. It follows builder pattern, letting you build
+ * your {@link ReadEntity} with whatever level of detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ private String filterString;
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String,String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public Builder withFilter(String filterString) {
+ this.filterString = filterString;
+ return this;
+ }
+
+ public ReadEntity build() {
+ return new ReadEntity(this);
+ }
+ }
+}
\ No newline at end of file
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/ReaderContext.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/** This class will contain information of different {@link InputSplit} obtained at master node
+ * and configuration. This class implements {@link Externalizable} so it can be serialized using
+ * standard java mechanisms.
+ */
+public class ReaderContext implements Externalizable, Configurable {
+
+ private static final long serialVersionUID = -2656468331739574367L;
+ private List<InputSplit> splits;
+ private Configuration conf;
+
+ public ReaderContext() {
+ this.splits = new ArrayList<InputSplit>();
+ this.conf = new Configuration();
+ }
+
+ public void setInputSplits(final List<InputSplit> splits) {
+ this.splits = splits;
+ }
+
+ public List<InputSplit> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ out.writeInt(splits.size());
+ for (InputSplit split : splits) {
+ ((HCatSplit)split).write(out);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ int numOfSplits = in.readInt();
+ for (int i=0 ; i < numOfSplits; i++) {
+ HCatSplit split = new HCatSplit();
+ split.readFields(in);
+ splits.add(split);
+ }
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriteEntity.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class WriteEntity extends EntityBase.Entity{
+
+ /** Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to build
+ * {@link WriteEntity}.
+ */
+
+ private WriteEntity() {
+ // Not allowed.
+ }
+
+ private WriteEntity(Builder builder) {
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ }
+
+ /** This class should be used to build {@link WriteEntity}. It follows builder pattern, letting you build
+ * your {@link WriteEntity} with whatever level of detail you want.
+ *
+ */
+ public static class Builder extends EntityBase{
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String,String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public WriteEntity build() {
+ return new WriteEntity(this);
+ }
+
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/WriterContext.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/** This contains information obtained at master node to help prepare slave nodes for writer.
+ * This class implements {@link Externalizable} so it can be serialized using
+ * standard java mechanisms. Master should serialize it and make it available to slaves to
+ * prepare for writes.
+ */
+public class WriterContext implements Externalizable, Configurable{
+
+ private static final long serialVersionUID = -5899374262971611840L;
+ private Configuration conf;
+
+ public WriterContext() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+
+/** This reader reads via {@link HCatInputFormat}
+ *
+ */
+public class HCatInputFormatReader extends HCatReader{
+
+ private InputSplit split;
+
+ public HCatInputFormatReader(InputSplit split, Configuration config, StateProvider sp) {
+ super(config, sp);
+ this.split = split;
+ }
+
+ public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+ super(info,config);
+ }
+
+ @Override
+ public ReaderContext prepareRead() throws HCatException {
+
+ try {
+ Job job = new Job(conf);
+ InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString());
+ HCatInputFormat.setInput(job, jobInfo);
+ HCatInputFormat hcif = new HCatInputFormat();
+ ReaderContext cntxt = new ReaderContext();
+ cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null)));
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED,e);
+ }
+ }
+
+ @Override
+ public Iterator<HCatRecord> read() throws HCatException {
+
+ HCatInputFormat inpFmt = new HCatInputFormat();
+ RecordReader<WritableComparable, HCatRecord> rr;
+ try {
+ TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID());
+ rr = inpFmt.createRecordReader(split, cntxt);
+ rr.initialize(split, cntxt);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ return new HCatRecordItr(rr);
+ }
+
+
+ private static class HCatRecordItr implements Iterator<HCatRecord>{
+
+ private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+ HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+ curRecReader = rr;
+ }
+
+ @Override
+ public boolean hasNext(){
+ try {
+ boolean retVal = curRecReader.nextKeyValue();
+ if (retVal) {
+ return true;
+ }
+ // if its false, we need to close recordReader.
+ curRecReader.close();
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public HCatRecord next() {
+ try {
+ return curRecReader.getCurrentValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not allowed");
+ }
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+/** This writer writes via {@link HCatOutputFormat}
+ *
+ */
+public class HCatOutputFormatWriter extends HCatWriter {
+
+ public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+ super(we, config);
+ }
+
+ public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+ super(config, sp);
+ }
+
+ @Override
+ public WriterContext prepareWrite() throws HCatException {
+ OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs());
+ Job job;
+ try {
+ job = new Job(conf);
+ HCatOutputFormat.setOutput(job, jobInfo);
+ HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ outFormat.checkOutputSpecs(job);
+ outFormat.getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).setupJob(job);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ WriterContext cntxt = new WriterContext();
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ }
+
+ @Override
+ public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+
+ int id = sp.getId();
+ setVarsInConf(id);
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(new TaskID(), id));
+ OutputCommitter committer = null;
+ RecordWriter<WritableComparable<?>, HCatRecord> writer;
+ try {
+ committer = outFormat.getOutputCommitter(cntxt);
+ committer.setupTask(cntxt);
+ writer = outFormat.getRecordWriter(cntxt);
+ while(recordItr.hasNext()){
+ HCatRecord rec = recordItr.next();
+ writer.write(null, rec);
+ }
+ writer.close(cntxt);
+ if(committer.needsTaskCommit(cntxt)){
+ committer.commitTask(cntxt);
+ }
+ } catch (IOException e) {
+ if(null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing",e);
+ } catch (InterruptedException e) {
+ if(null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
+ }
+ }
+
+ @Override
+ public void commit(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+ .commitJob(new JobContext(context.getConf(), null));
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ @Override
+ public void abort(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID()))
+ .abortJob(new JobContext(context.getConf(), null),State.FAILED);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ private void setVarsInConf(int id) {
+
+ // Following two config keys are required by FileOutputFormat to work correctly.
+ // In usual case of Hadoop, JobTracker will set these before launching tasks.
+ // Since there is no jobtracker here, we set it ourself.
+ conf.setInt("mapred.task.partition", id);
+ conf.set("mapred.task.id", "attempt__0000_r_000000_"+id);
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+import java.text.NumberFormat;
+import java.util.Random;
+
+
+public class DefaultStateProvider implements StateProvider {
+
+ /** Default implementation. Here, ids are generated randomly.
+ */
+ @Override
+ public int getId() {
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+ return Integer.parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+ }
+
+ private static StateProvider sp;
+
+ public static synchronized StateProvider get() {
+ if (null == sp) {
+ sp = new DefaultStateProvider();
+ }
+ return sp;
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/** If external system wants to communicate any state to slaves, they can do so via this interface.
+ * One example of this in case of Map-Reduce is ids assigned by {@link JobTracker} to
+ * {@link TaskTracker}
+ */
+public interface StateProvider {
+
+ /** This method should return id assigned to slave node.
+ * @return id
+ */
+ public int getId();
+}
Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderMaster.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.utils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+
+public class DataReaderMaster {
+
+ public static void main(String[] args) throws FileNotFoundException, IOException {
+
+ // This config contains all the configuration that master node wants to provide
+ // to the HCatalog.
+ Properties externalConfigs = new Properties();
+ externalConfigs.load(new FileReader(args[0]));
+ Map<String,String> config = new HashMap<String, String>();
+
+ for (Entry<Object, Object> kv : externalConfigs.entrySet()){
+ config.put((String)kv.getKey(), (String)kv.getValue());
+ }
+
+ // This piece of code runs in master node and gets necessary context.
+ ReaderContext context = runsInMaster(config);
+
+ ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(new File(args[1])));
+ oos.writeObject(context);
+ oos.flush();
+ oos.close();
+ // Master node will serialize readercontext and will make it available at slaves.
+ }
+
+ private static ReaderContext runsInMaster(Map<String,String> config) throws HCatException {
+
+ ReadEntity.Builder builder = new ReadEntity.Builder();
+ ReadEntity entity = builder.withTable(config.get("table")).build();
+ HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+ ReaderContext cntxt = reader.prepareRead();
+ return cntxt;
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataReaderSlave.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,44 @@
+package org.apache.hcatalog.utils;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+
+public class DataReaderSlave {
+
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+
+ ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File(args[0])));
+ ReaderContext cntxt = (ReaderContext) ois.readObject();
+ ois.close();
+
+ String[] inpSlitsToRead = args[1].split(",");
+ List<InputSplit> splits = cntxt.getSplits();
+
+ for (int i = 0; i < inpSlitsToRead.length; i++){
+ InputSplit split = splits.get(Integer.parseInt(inpSlitsToRead[i]));
+ HCatReader reader = DataTransferFactory.getHCatReader(split, cntxt.getConf());
+ Iterator<HCatRecord> itr = reader.read();
+ File f = new File(args[2]+"-"+i);
+ f.delete();
+ BufferedWriter outFile = new BufferedWriter(new FileWriter(f));
+ while(itr.hasNext()){
+ String rec = itr.next().toString().replaceFirst("\\s+$", "");
+ System.err.println(rec);
+ outFile.write(rec+"\n");
+ }
+ outFile.close();
+ }
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterMaster.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import javax.imageio.stream.FileImageInputStream;
+
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+
+public class DataWriterMaster {
+
+ public static void main(String[] args) throws FileNotFoundException, IOException, ClassNotFoundException {
+
+ // This config contains all the configuration that master node wants to provide
+ // to the HCatalog.
+ Properties externalConfigs = new Properties();
+ externalConfigs.load(new FileReader(args[0]));
+ Map<String,String> config = new HashMap<String, String>();
+
+ for (Entry<Object, Object> kv : externalConfigs.entrySet()){
+ System.err.println("k: " + kv.getKey() + "\t v: " + kv.getValue());
+ config.put((String)kv.getKey(), (String)kv.getValue());
+ }
+
+ if(args.length == 3 && "commit".equalsIgnoreCase(args[2])){
+ // Then, master commits if everything goes well.
+ ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File(args[1])));
+ WriterContext cntxt = (WriterContext)ois.readObject();
+ commit(config,true, cntxt);
+ System.exit(0);
+ }
+ // This piece of code runs in master node and gets necessary context.
+ WriterContext cntxt = runsInMaster(config);
+
+
+ // Master node will serialize writercontext and will make it available at slaves.
+ File f = new File(args[1]);
+ f.delete();
+ ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(f));
+ oos.writeObject(cntxt);
+ oos.flush();
+ oos.close();
+ }
+
+ private static WriterContext runsInMaster(Map<String, String> config) throws HCatException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable(config.get("table")).build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ WriterContext info = writer.prepareWrite();
+ return info;
+ }
+
+ private static void commit(Map<String, String> config, boolean status, WriterContext cntxt) throws HCatException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable(config.get("table")).build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ if(status){
+ writer.commit(cntxt);
+ } else {
+ writer.abort(cntxt);
+ }
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/DataWriterSlave.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,69 @@
+package org.apache.hcatalog.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriterContext;
+
+public class DataWriterSlave {
+
+ public static void main(String[] args) throws FileNotFoundException, IOException, ClassNotFoundException {
+
+ ObjectInputStream ois = new ObjectInputStream(new FileInputStream(args[0]));
+ WriterContext cntxt = (WriterContext) ois.readObject();
+ ois.close();
+
+ HCatWriter writer = DataTransferFactory.getHCatWriter(cntxt);
+ writer.write(new HCatRecordItr(args[1]));
+
+ }
+
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ BufferedReader reader;
+ String curLine;
+
+ public HCatRecordItr(String fileName) throws FileNotFoundException {
+ reader = new BufferedReader(new FileReader(new File(fileName)));
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ curLine = reader.readLine();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null == curLine ? false : true;
+ }
+
+ @Override
+ public HCatRecord next() {
+
+ String[] fields = curLine.split("\t");
+ List<Object> data = new ArrayList<Object>(3);
+ data.add(fields[0]);
+ data.add(Integer.parseInt(fields[1]));
+ data.add(Double.parseDouble(fields[2]));
+ return new DefaultHCatRecord(data);
+ }
+
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
+
+ }
+ }
+}
Added: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java?rev=1307220&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/data/TestReaderWriter.java Fri Mar 30 02:44:54 2012
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.data;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestReaderWriter {
+
+ @Test
+ public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException {
+
+ HiveConf conf = new HiveConf(getClass());
+ Driver driver = new Driver(conf);
+ SessionState.start(new CliSessionState(conf));
+ driver.run("drop table mytbl");
+ driver.run("create table mytbl (a string, b int)");
+ Iterator<Entry<String,String>> itr = conf.iterator();
+ Map<String,String> map = new HashMap<String, String>();
+ while(itr.hasNext()){
+ Entry<String,String> kv = itr.next();
+ map.put(kv.getKey(), kv.getValue());
+ }
+
+ WriterContext cntxt = runsInMaster(map);
+
+ File writeCntxtFile = File.createTempFile("hcat-write", "temp");
+ writeCntxtFile.deleteOnExit();
+
+ // Serialize context.
+ ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
+ oos.writeObject(cntxt);
+ oos.flush();
+ oos.close();
+
+ // Now, deserialize it.
+ ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
+ cntxt = (WriterContext) ois.readObject();
+ ois.close();
+
+ runsInSlave(cntxt);
+ commit(map, true, cntxt);
+
+ ReaderContext readCntxt = runsInMaster(map, false);
+
+ File readCntxtFile = File.createTempFile("hcat-read", "temp");
+ readCntxtFile.deleteOnExit();
+ oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
+ oos.writeObject(readCntxt);
+ oos.flush();
+ oos.close();
+
+ ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
+ readCntxt = (ReaderContext) ois.readObject();
+ ois.close();
+
+
+ for(InputSplit split : readCntxt.getSplits()){
+ runsInSlave(split, readCntxt.getConf());
+ }
+ }
+
+ private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable("mytbl").build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ WriterContext info = writer.prepareWrite();
+ return info;
+ }
+
+ private ReaderContext runsInMaster(Map<String,String> config, boolean bogus) throws HCatException {
+
+ ReadEntity.Builder builder = new ReadEntity.Builder();
+ ReadEntity entity = builder.withTable("mytbl").build();
+ HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+ ReaderContext cntxt = reader.prepareRead();
+ return cntxt;
+ }
+
+ private void runsInSlave(InputSplit split, Configuration config) throws HCatException {
+
+ HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+ Iterator<HCatRecord> itr = reader.read();
+ int i = 1;
+ while(itr.hasNext()){
+ HCatRecord read = itr.next();
+ HCatRecord written = getRecord(i++);
+ // Argh, HCatRecord doesnt implement equals()
+ Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), written.get(0).equals(read.get(0)));
+ Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), written.get(1).equals(read.get(1)));
+ Assert.assertEquals(2, read.size());
+ }
+ Assert.assertFalse(itr.hasNext());
+ }
+
+ private void runsInSlave(WriterContext context) throws HCatException {
+
+ HCatWriter writer = DataTransferFactory.getHCatWriter(context);
+ writer.write(new HCatRecordItr());
+ }
+
+ private void commit(Map<String, String> config, boolean status, WriterContext context) throws IOException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable("mytbl").build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ if(status){
+ writer.commit(context);
+ } else {
+ writer.abort(context);
+ }
+ }
+
+ private static HCatRecord getRecord(int i) {
+ List<Object> list = new ArrayList<Object>(2);
+ list.add("Row #: " + i);
+ list.add(i);
+ return new DefaultHCatRecord(list);
+ }
+
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ int i = 0;
+ @Override
+ public boolean hasNext() {
+ return i++ < 100 ? true : false;
+ }
+
+ @Override
+ public HCatRecord next() {
+ return getRecord(i);
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException();
+ }
+ }
+}