You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [4/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/ ...
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+/**
+ * This is a base class for
+ * {@link ReadEntity.Builder} / {@link WriteEntity.Builder}.
+ * Many fields in them are common, so this class
+ * contains the common fields.
+ */
+
+abstract class EntityBase {
+
+ String region;
+ String tableName;
+ String dbName;
+ Map<String, String> partitionKVs;
+
+ /**
+ * Common methods for {@link ReadEntity} and {@link WriteEntity}
+ */
+
+ abstract static class Entity extends EntityBase {
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public Map<String, String> getPartitionKVs() {
+ return partitionKVs;
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/**
+ * This abstract class is internal to HCatalog and abstracts away the notion of
+ * underlying system from which reads will be done.
+ */
+
+public abstract class HCatReader {
+
+ /**
+ * This should be called at master node to obtain {@link ReaderContext} which
+ * then should be serialized and sent to slave nodes.
+ *
+ * @return {@link ReaderContext}
+ * @throws HCatException
+ */
+ public abstract ReaderContext prepareRead() throws HCatException;
+
+ /**
+ * This should be called at slave nodes to read {@link HCatRecord}s
+ *
+ * @return {@link Iterator} of {@link HCatRecord}
+ * @throws HCatException
+ */
+ public abstract Iterator<HCatRecord> read() throws HCatException;
+
+ /**
+ * This constructor will be invoked by {@link DataTransferFactory} at master
+ * node. Don't use this constructor. Instead, use {@link DataTransferFactory}
+ *
+ * @param re
+ * @param config
+ */
+ protected HCatReader(final ReadEntity re, final Map<String, String> config) {
+ this(config);
+ this.re = re;
+ }
+
+ /**
+ * This constructor will be invoked by {@link DataTransferFactory} at slave
+ * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory}
+ *
+ * @param config
+ * @param sp
+ */
+
+ protected HCatReader(final Configuration config, StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ protected ReadEntity re; // This will be null at slaves.
+ protected Configuration conf;
+ protected ReaderContext info;
+ protected StateProvider sp; // This will be null at master.
+
+ private HCatReader(final Map<String, String> config) {
+ Configuration conf = new Configuration();
+ if (null != config) {
+ for (Entry<String, String> kv : config.entrySet()) {
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ if (null == conf) {
+ throw new IllegalStateException(
+ "HCatReader is not constructed correctly.");
+ }
+ return conf;
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/**
+ * This abstraction is internal to HCatalog. This is to facilitate writing to
+ * HCatalog from external systems. Don't try to instantiate this directly.
+ * Instead, use {@link DataTransferFactory}
+ */
+
+public abstract class HCatWriter {
+
+ protected Configuration conf;
+ protected WriteEntity we; // This will be null at slave nodes.
+ protected WriterContext info;
+ protected StateProvider sp;
+
+ /**
+ * External system should invoke this method exactly once from a master node.
+ *
+ * @return {@link WriterContext} This should be serialized and sent to slave
+ * nodes to construct HCatWriter there.
+ * @throws HCatException
+ */
+ public abstract WriterContext prepareWrite() throws HCatException;
+
+ /**
+ * This method should be used at slave needs to perform writes.
+ *
+ * @param recordItr
+ * {@link Iterator} records to be written into HCatalog.
+ * @throws {@link HCatException}
+ */
+ public abstract void write(final Iterator<HCatRecord> recordItr)
+ throws HCatException;
+
+ /**
+ * This method should be called at master node. Primary purpose of this is to
+ * do metadata commit.
+ *
+ * @throws {@link HCatException}
+ */
+ public abstract void commit(final WriterContext context) throws HCatException;
+
+ /**
+ * This method should be called at master node. Primary purpose of this is to
+ * do cleanups in case of failures.
+ *
+ * @throws {@link HCatException} *
+ */
+ public abstract void abort(final WriterContext context) throws HCatException;
+
+ /**
+ * This constructor will be used at master node
+ *
+ * @param we
+ * WriteEntity defines where in storage records should be written to.
+ * @param config
+ * Any configuration which external system wants to communicate to
+ * HCatalog for performing writes.
+ */
+ protected HCatWriter(final WriteEntity we, final Map<String, String> config) {
+ this(config);
+ this.we = we;
+ }
+
+ /**
+ * This constructor will be used at slave nodes.
+ *
+ * @param config
+ */
+ protected HCatWriter(final Configuration config, final StateProvider sp) {
+ this.conf = config;
+ this.sp = sp;
+ }
+
+ private HCatWriter(final Map<String, String> config) {
+ Configuration conf = new Configuration();
+ if (config != null) {
+ // user is providing config, so it could be null.
+ for (Entry<String, String> kv : config.entrySet()) {
+ conf.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ this.conf = conf;
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class ReadEntity extends EntityBase.Entity {
+
+ private String filterString;
+
+ /**
+ * Don't instantiate {@link ReadEntity} directly. Use,
+ * {@link ReadEntity.Builder} instead.
+ *
+ */
+ private ReadEntity() {
+ // Not allowed
+ }
+
+ private ReadEntity(Builder builder) {
+
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ this.filterString = builder.filterString;
+ }
+
+ public String getFilterString() {
+ return this.filterString;
+ }
+
+ /**
+ * This class should be used to build {@link ReadEntity}. It follows builder
+ * pattern, letting you build your {@link ReadEntity} with whatever level of
+ * detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ private String filterString;
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String, String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public Builder withFilter(String filterString) {
+ this.filterString = filterString;
+ return this;
+ }
+
+ public ReadEntity build() {
+ return new ReadEntity(this);
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * This class will contain information of different {@link InputSplit} obtained
+ * at master node and configuration. This class implements
+ * {@link Externalizable} so it can be serialized using standard java
+ * mechanisms.
+ */
+public class ReaderContext implements Externalizable, Configurable {
+
+ private static final long serialVersionUID = -2656468331739574367L;
+ private List<InputSplit> splits;
+ private Configuration conf;
+
+ public ReaderContext() {
+ this.splits = new ArrayList<InputSplit>();
+ this.conf = new Configuration();
+ }
+
+ public void setInputSplits(final List<InputSplit> splits) {
+ this.splits = splits;
+ }
+
+ public List<InputSplit> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ out.writeInt(splits.size());
+ for (InputSplit split : splits) {
+ ((HCatSplit) split).write(out);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ int numOfSplits = in.readInt();
+ for (int i = 0; i < numOfSplits; i++) {
+ HCatSplit split = new HCatSplit();
+ split.readFields(in);
+ splits.add(split);
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class WriteEntity extends EntityBase.Entity {
+
+ /**
+ * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to
+ * build {@link WriteEntity}.
+ */
+
+ private WriteEntity() {
+ // Not allowed.
+ }
+
+ private WriteEntity(Builder builder) {
+ this.region = builder.region;
+ this.dbName = builder.dbName;
+ this.tableName = builder.tableName;
+ this.partitionKVs = builder.partitionKVs;
+ }
+
+ /**
+ * This class should be used to build {@link WriteEntity}. It follows builder
+ * pattern, letting you build your {@link WriteEntity} with whatever level of
+ * detail you want.
+ *
+ */
+ public static class Builder extends EntityBase {
+
+ public Builder withRegion(final String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder withDatabase(final String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public Builder withTable(final String tblName) {
+ this.tableName = tblName;
+ return this;
+ }
+
+ public Builder withPartition(final Map<String, String> partKVs) {
+ this.partitionKVs = partKVs;
+ return this;
+ }
+
+ public WriteEntity build() {
+ return new WriteEntity(this);
+ }
+
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This contains information obtained at master node to help prepare slave nodes
+ * for writer. This class implements {@link Externalizable} so it can be
+ * serialized using standard java mechanisms. Master should serialize it and
+ * make it available to slaves to prepare for writes.
+ */
+public class WriterContext implements Externalizable, Configurable {
+
+ private static final long serialVersionUID = -5899374262971611840L;
+ private Configuration conf;
+
+ public WriterContext() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+
+/**
+ * This reader reads via {@link HCatInputFormat}
+ *
+ */
+public class HCatInputFormatReader extends HCatReader {
+
+ private InputSplit split;
+
+ public HCatInputFormatReader(InputSplit split, Configuration config,
+ StateProvider sp) {
+ super(config, sp);
+ this.split = split;
+ }
+
+ public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+ super(info, config);
+ }
+
+ @Override
+ public ReaderContext prepareRead() throws HCatException {
+ try {
+ Job job = new Job(conf);
+ HCatInputFormat hcif = HCatInputFormat.setInput(
+ job, re.getDbName(), re.getTableName()).setFilter(re.getFilterString());
+ ReaderContext cntxt = new ReaderContext();
+ cntxt.setInputSplits(hcif.getSplits(
+ HCatHadoopShims.Instance.get().createJobContext(job.getConfiguration(), null)));
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ @Override
+ public Iterator<HCatRecord> read() throws HCatException {
+
+ HCatInputFormat inpFmt = new HCatInputFormat();
+ RecordReader<WritableComparable, HCatRecord> rr;
+ try {
+ TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext(conf, new TaskAttemptID());
+ rr = inpFmt.createRecordReader(split, cntxt);
+ rr.initialize(split, cntxt);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ return new HCatRecordItr(rr);
+ }
+
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+ HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+ curRecReader = rr;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ boolean retVal = curRecReader.nextKeyValue();
+ if (retVal) {
+ return true;
+ }
+ // if its false, we need to close recordReader.
+ curRecReader.close();
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public HCatRecord next() {
+ try {
+ return curRecReader.getCurrentValue();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not allowed");
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+
+/**
+ * This writer writes via {@link HCatOutputFormat}
+ *
+ */
+public class HCatOutputFormatWriter extends HCatWriter {
+
+ public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+ super(we, config);
+ }
+
+ public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+ super(config, sp);
+ }
+
+ @Override
+ public WriterContext prepareWrite() throws HCatException {
+ OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
+ we.getTableName(), we.getPartitionKVs());
+ Job job;
+ try {
+ job = new Job(conf);
+ HCatOutputFormat.setOutput(job, jobInfo);
+ HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ outFormat.checkOutputSpecs(job);
+ outFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (job.getConfiguration(), HCatHadoopShims.Instance.get().createTaskAttemptID())).setupJob(job);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ WriterContext cntxt = new WriterContext();
+ cntxt.setConf(job.getConfiguration());
+ return cntxt;
+ }
+
+ @Override
+ public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+
+ int id = sp.getId();
+ setVarsInConf(id);
+ HCatOutputFormat outFormat = new HCatOutputFormat();
+ TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (conf, new TaskAttemptID(HCatHadoopShims.Instance.get().createTaskID(), id));
+ OutputCommitter committer = null;
+ RecordWriter<WritableComparable<?>, HCatRecord> writer;
+ try {
+ committer = outFormat.getOutputCommitter(cntxt);
+ committer.setupTask(cntxt);
+ writer = outFormat.getRecordWriter(cntxt);
+ while (recordItr.hasNext()) {
+ HCatRecord rec = recordItr.next();
+ writer.write(null, rec);
+ }
+ writer.close(cntxt);
+ if (committer.needsTaskCommit(cntxt)) {
+ committer.commitTask(cntxt);
+ }
+ } catch (IOException e) {
+ if (null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
+ } catch (InterruptedException e) {
+ if (null != committer) {
+ try {
+ committer.abortTask(cntxt);
+ } catch (IOException e1) {
+ throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+ }
+ }
+ throw new HCatException("Failed while writing", e);
+ }
+ }
+
+ @Override
+ public void commit(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
+ .commitJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null));
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ @Override
+ public void abort(WriterContext context) throws HCatException {
+ try {
+ new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+ (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
+ .abortJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null), State.FAILED);
+ } catch (IOException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ } catch (InterruptedException e) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+ }
+ }
+
+ private void setVarsInConf(int id) {
+
+ // Following two config keys are required by FileOutputFormat to work
+ // correctly.
+ // In usual case of Hadoop, JobTracker will set these before launching
+ // tasks.
+ // Since there is no jobtracker here, we set it ourself.
+ conf.setInt("mapred.task.partition", id);
+ conf.set("mapred.task.id", "attempt__0000_r_000000_" + id);
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+import java.text.NumberFormat;
+import java.util.Random;
+
+public class DefaultStateProvider implements StateProvider {
+
+ /**
+ * Default implementation. Here, ids are generated randomly.
+ */
+ @Override
+ public int getId() {
+
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+ return Integer
+ .parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+ }
+
+ private static StateProvider sp;
+
+ public static synchronized StateProvider get() {
+ if (null == sp) {
+ sp = new DefaultStateProvider();
+ }
+ return sp;
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+/**
+ * If external system wants to communicate any state to slaves, they can do so
+ * via this interface. One example of this in case of Map-Reduce is ids assigned
+ * by JobTracker to TaskTracker.
+ */
+public interface StateProvider {
+
+ /**
+ * This method should return id assigned to slave node.
+ *
+ * @return id
+ */
+ public int getId();
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.har;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.tools.HadoopArchives;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+
+public class HarOutputCommitterPostProcessor {
+
+ boolean isEnabled = false;
+
+ public boolean isEnabled() {
+ return isEnabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.isEnabled = enabled;
+ }
+
+
+ public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
+// LOG.info("Archiving partition ["+partPath.toString()+"]");
+ makeHar(context, partPath.toUri().toString(), harFile(partPath));
+ partition.getParameters().put(hive_metastoreConstants.IS_ARCHIVED, "true");
+ }
+
+ public String harFile(Path ptnPath) throws IOException {
+ String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
+// LOG.info("har file : " + harFile);
+ return harFile;
+ }
+
+ public String getParentFSPath(Path ptnPath) throws IOException {
+ return ptnPath.toUri().getPath().replaceFirst("/+$", "");
+ }
+
+ public String getProcessedLocation(Path ptnPath) throws IOException {
+ String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
+// LOG.info("har location : " + harLocn);
+ return harLocn;
+ }
+
+
+ /**
+ * Creates a har file from the contents of a given directory, using that as root.
+ * @param dir Directory to archive
+ * @param harFile The HAR file to create
+ */
+ public static void makeHar(JobContext context, String dir, String harFile) throws IOException {
+// Configuration conf = context.getConfiguration();
+// Credentials creds = context.getCredentials();
+
+// HCatUtil.logAllTokens(LOG,context);
+
+ int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
+ Path archivePath = new Path(harFile.substring(0, lastSep));
+ final String[] args = {
+ "-archiveName",
+ harFile.substring(lastSep + 1, harFile.length()),
+ "-p",
+ dir,
+ "*",
+ archivePath.toString()
+ };
+// for (String arg : args){
+// LOG.info("Args to har : "+ arg);
+// }
+ try {
+ Configuration newConf = new Configuration();
+ FileSystem fs = archivePath.getFileSystem(newConf);
+
+ String hadoopTokenFileLocationEnvSetting = System.getenv(HCatConstants.SYSENV_HADOOP_TOKEN_FILE_LOCATION);
+ if ((hadoopTokenFileLocationEnvSetting != null) && (!hadoopTokenFileLocationEnvSetting.isEmpty())) {
+ newConf.set(HCatConstants.CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocationEnvSetting);
+// LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+ System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]");
+ }
+// for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){
+// LOG.info("src : "+ds.getPath().toUri().toString());
+// }
+
+ final HadoopArchives har = new HadoopArchives(newConf);
+ int rc = ToolRunner.run(har, args);
+ if (rc != 0) {
+ throw new Exception("Har returned error code " + rc);
+ }
+
+// for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){
+// LOG.info("dest : "+hs.getPath().toUri().toString());
+// }
+// doHarCheck(fs,harFile);
+// LOG.info("Nuking " + dir);
+ fs.delete(new Path(dir), true);
+ } catch (Exception e) {
+ throw new HCatException("Error creating Har [" + harFile + "] from [" + dir + "]", e);
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Part of the DefaultOutput*Container classes
+ * See {@link DefaultOutputFormatContainer} for more information
+ */
+class DefaultOutputCommitterContainer extends OutputCommitterContainer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultOutputCommitterContainer.class);
+
+ /**
+ * @param context current JobContext
+ * @param baseCommitter OutputCommitter to contain
+ * @throws IOException
+ */
+ public DefaultOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
+ super(context, baseCommitter);
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context));
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, State state) throws IOException {
+ getBaseOutputCommitter().abortJob(HCatMapRedUtil.createJobContext(jobContext), state);
+ cleanupJob(jobContext);
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
+ cleanupJob(jobContext);
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
+
+ //Cancel HCat and JobTracker tokens
+ HiveMetaStoreClient client = null;
+ try {
+ HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration());
+ client = HCatUtil.getHiveClient(hiveConf);
+ String tokenStrForm = client.getTokenStrForm();
+ if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ client.cancelDelegationToken(tokenStrForm);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel delegation token", e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * Bare bones implementation of OutputFormatContainer. Does only the required
+ * tasks to work properly with HCatalog. HCatalog features which require a
+ * storage specific implementation are unsupported (ie partitioning).
+ */
+class DefaultOutputFormatContainer extends OutputFormatContainer {
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
+ super(of);
+ }
+
+ static synchronized String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
+ }
+
+ /**
+ * Get the record writer for the job. Uses the storagehandler's OutputFormat
+ * to get the record writer.
+ * @param context the information about the current task.
+ * @return a RecordWriter to write the output for the job.
+ * @throws IOException
+ */
+ @Override
+ public RecordWriter<WritableComparable<?>, HCatRecord>
+ getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
+ return new DefaultRecordWriterContainer(context,
+ getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
+ }
+
+
+ /**
+ * Get the output committer for this output format. This is responsible
+ * for ensuring the output is committed correctly.
+ * @param context the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter());
+ }
+
+ /**
+ * Check for validity of the output-specification for the job.
+ * @param context information about the job
+ * @throws IOException when output should not be attempted
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getBaseOutputFormat();
+ JobConf jobConf = new JobConf(context.getConfiguration());
+ outputFormat.checkOutputSpecs(null, jobConf);
+ HCatUtil.copyConf(jobConf, context.getConfiguration());
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * Part of the DefaultOutput*Container classes
+ * See {@link DefaultOutputFormatContainer} for more information
+ */
+class DefaultRecordWriterContainer extends RecordWriterContainer {
+
+ private final HCatStorageHandler storageHandler;
+ private final SerDe serDe;
+ private final OutputJobInfo jobInfo;
+ private final ObjectInspector hcatRecordOI;
+
+ /**
+ * @param context current JobContext
+ * @param baseRecordWriter RecordWriter to contain
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public DefaultRecordWriterContainer(TaskAttemptContext context,
+ org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) throws IOException, InterruptedException {
+ super(context, baseRecordWriter);
+ jobInfo = HCatOutputFormat.getJobInfo(context);
+ storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+ HCatOutputFormat.configureOutputStorageHandler(context);
+ serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
+ hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
+ try {
+ InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
+ } catch (SerDeException e) {
+ throw new IOException("Failed to initialize SerDe", e);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ getBaseRecordWriter().close(InternalUtil.createReporter(context));
+ }
+
+ @Override
+ public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
+ InterruptedException {
+ try {
+ getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
+ } catch (SerDeException e) {
+ throw new IOException("Failed to serialize object", e);
+ }
+ }
+
+}