You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:08 UTC
[68/92] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..da46690
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class HadoopInputSplit implements InputSplit {
+
+ private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
+ private JobConf jobConf;
+ private int splitNumber;
+ private String hadoopInputSplitTypeName;
+
+
+ public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
+ return hadoopInputSplit;
+ }
+
+
+ public HadoopInputSplit() {
+ super();
+ }
+
+
+ public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
+ this.hadoopInputSplit = hInputSplit;
+ this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
+ this.jobConf = jobconf;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(splitNumber);
+ out.writeUTF(hadoopInputSplitTypeName);
+ hadoopInputSplit.write(out);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.splitNumber=in.readInt();
+ this.hadoopInputSplitTypeName = in.readUTF();
+ if(hadoopInputSplit == null) {
+ try {
+ Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+ Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+ this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unable to create InputSplit", e);
+ }
+ }
+ this.hadoopInputSplit.readFields(in);
+ }
+
+ @Override
+ public int getSplitNumber() {
+ return this.splitNumber;
+ }
+
+ public void setSplitNumber(int splitNumber) {
+ this.splitNumber = splitNumber;
+ }
+
+ public void setHadoopInputSplit(
+ org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
+ this.hadoopInputSplit = hadoopInputSplit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
new file mode 100644
index 0000000..cf12cae
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.types.TypeInformation;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+
+ private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
+ private Class<K> keyClass;
+ private Class<V> valueClass;
+ private org.apache.hadoop.conf.Configuration configuration;
+
+ private transient RecordReader<K, V> recordReader;
+ private boolean fetched = false;
+ private boolean hasNext;
+
+ public HadoopInputFormat() {
+ super();
+ }
+
+ public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+ super();
+ this.mapreduceInputFormat = mapreduceInputFormat;
+ this.keyClass = key;
+ this.valueClass = value;
+ this.configuration = job.getConfiguration();
+ HadoopUtils.mergeHadoopConf(configuration);
+ }
+
+ public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
+ return this.mapreduceInputFormat;
+ }
+
+ public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
+ this.mapreduceInputFormat = mapreduceInputFormat;
+ }
+
+ public org.apache.hadoop.conf.Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // InputFormat
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void configure(Configuration parameters) {
+ // nothing to do
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+ // only gather base statistics for FileInputFormats
+ if(!(mapreduceInputFormat instanceof FileInputFormat)) {
+ return null;
+ }
+
+ JobContext jobContext = null;
+ try {
+ jobContext = HadoopUtils.instantiateJobContext(configuration, null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+ (FileBaseStatistics) cachedStats : null;
+
+ try {
+ final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
+ return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+ } catch (IOException ioex) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Could not determine statistics due to an io error: "
+ + ioex.getMessage());
+ }
+ } catch (Throwable t) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Unexpected problem while getting the file statistics: "
+ + t.getMessage(), t);
+ }
+ }
+
+ // no statistics available
+ return null;
+ }
+
+ @Override
+ public HadoopInputSplit[] createInputSplits(int minNumSplits)
+ throws IOException {
+ configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+ JobContext jobContext = null;
+ try {
+ jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ List<org.apache.hadoop.mapreduce.InputSplit> splits;
+ try {
+ splits = this.mapreduceInputFormat.getSplits(jobContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not get Splits.", e);
+ }
+ HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+ for(int i = 0; i < hadoopInputSplits.length; i++){
+ hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext);
+ }
+ return hadoopInputSplits;
+ }
+
+ @Override
+ public Class<? extends HadoopInputSplit> getInputSplitType() {
+ return HadoopInputSplit.class;
+ }
+
+ @Override
+ public void open(HadoopInputSplit split) throws IOException {
+ TaskAttemptContext context = null;
+ try {
+ context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ this.recordReader = this.mapreduceInputFormat
+ .createRecordReader(split.getHadoopInputSplit(), context);
+ this.recordReader.initialize(split.getHadoopInputSplit(), context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create RecordReader.", e);
+ } finally {
+ this.fetched = false;
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if(!this.fetched) {
+ fetchNext();
+ }
+ return !this.hasNext;
+ }
+
+ private void fetchNext() throws IOException {
+ try {
+ this.hasNext = this.recordReader.nextKeyValue();
+ } catch (InterruptedException e) {
+ throw new IOException("Could not fetch next KeyValue pair.", e);
+ } finally {
+ this.fetched = true;
+ }
+ }
+
+ @Override
+ public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+ if(!this.fetched) {
+ fetchNext();
+ }
+ if(!this.hasNext) {
+ return null;
+ }
+ try {
+ record.f0 = this.recordReader.getCurrentKey();
+ record.f1 = this.recordReader.getCurrentValue();
+ } catch (InterruptedException e) {
+ throw new IOException("Could not get KeyValue pair.", e);
+ }
+ this.fetched = false;
+
+ return record;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper methods
+ // --------------------------------------------------------------------------------------------
+
+ private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
+ ArrayList<FileStatus> files) throws IOException {
+
+ long latestModTime = 0L;
+
+ // get the file info and check whether the cached statistics are still valid.
+ for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+
+ final Path filePath = new Path(hadoopPath.toUri());
+ final FileSystem fs = FileSystem.get(filePath.toUri());
+
+ final FileStatus file = fs.getFileStatus(filePath);
+ latestModTime = Math.max(latestModTime, file.getModificationTime());
+
+ // enumerate all files and check their modification time stamp.
+ if (file.isDir()) {
+ FileStatus[] fss = fs.listStatus(filePath);
+ files.ensureCapacity(files.size() + fss.length);
+
+ for (FileStatus s : fss) {
+ if (!s.isDir()) {
+ files.add(s);
+ latestModTime = Math.max(s.getModificationTime(), latestModTime);
+ }
+ }
+ } else {
+ files.add(file);
+ }
+ }
+
+ // check whether the cached statistics are still valid, if we have any
+ if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+ return cachedStats;
+ }
+
+ // calculate the whole length
+ long len = 0;
+ for (FileStatus s : files) {
+ len += s.getLen();
+ }
+
+ // sanity check
+ if (len <= 0) {
+ len = BaseStatistics.SIZE_UNKNOWN;
+ }
+
+ return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom serialization methods
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeUTF(this.mapreduceInputFormat.getClass().getName());
+ out.writeUTF(this.keyClass.getName());
+ out.writeUTF(this.valueClass.getName());
+ this.configuration.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ String hadoopInputFormatClassName = in.readUTF();
+ String keyClassName = in.readUTF();
+ String valueClassName = in.readUTF();
+
+ org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+ configuration.readFields(in);
+
+ if(this.configuration == null) {
+ this.configuration = configuration;
+ }
+
+ try {
+ this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+ }
+ try {
+ this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to find key class.", e);
+ }
+ try {
+ this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to find value class.", e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // ResultTypeQueryable
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<Tuple2<K,V>> getProducedType() {
+ return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
new file mode 100644
index 0000000..9eabc03
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private org.apache.hadoop.conf.Configuration configuration;
+ private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
+ private transient RecordWriter<K,V> recordWriter;
+ private transient FileOutputCommitter fileOutputCommitter;
+ private transient TaskAttemptContext context;
+
+ public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
+ super();
+ this.mapreduceOutputFormat = mapreduceOutputFormat;
+ this.configuration = job.getConfiguration();
+ HadoopUtils.mergeHadoopConf(configuration);
+ }
+
+ public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ public org.apache.hadoop.conf.Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() {
+ return this.mapreduceOutputFormat;
+ }
+
+ public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) {
+ this.mapreduceOutputFormat = mapreduceOutputFormat;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // OutputFormat
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void configure(Configuration parameters) {
+ // nothing to do
+ }
+
+ /**
+ * create the temporary output file for hadoop RecordWriter.
+ * @param taskNumber The number of the parallel instance.
+ * @param numTasks The number of parallel tasks.
+ * @throws IOException
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ if (Integer.toString(taskNumber + 1).length() > 6) {
+ throw new IOException("Task id too large.");
+ }
+
+ // for hadoop 2.2
+ this.configuration.set("mapreduce.output.basename", "tmp");
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+ + Integer.toString(taskNumber + 1)
+ + "_0");
+
+ try {
+ this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.configuration.set("mapred.task.id", taskAttemptID.toString());
+ // for hadoop 2.2
+ this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+
+ this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
+
+ try {
+ this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+ this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
+
+ try {
+ this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create RecordWriter.", e);
+ }
+ }
+
+
+ @Override
+ public void writeRecord(Tuple2<K, V> record) throws IOException {
+ try {
+ this.recordWriter.write(record.f0, record.f1);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not write Record.", e);
+ }
+ }
+
+ /**
+ * commit the task by moving the output file out from the temporary directory.
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ this.recordWriter.close(this.context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not close RecordReader.", e);
+ }
+
+ if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+ this.fileOutputCommitter.commitTask(this.context);
+ }
+ this.fileOutputCommitter.commitJob(this.context);
+
+ // rename tmp-* files to final name
+ FileSystem fs = FileSystem.get(this.configuration);
+
+ Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+ final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
+
+ // isDirectory does not work in hadoop 1
+ if(fs.getFileStatus(outputPath).isDir()) {
+ FileStatus[] files = fs.listStatus(outputPath);
+
+ for(FileStatus f : files) {
+ Matcher m = p.matcher(f.getPath().getName());
+ if(m.matches()) {
+ int part = Integer.valueOf(m.group(2));
+ fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom serialization methods
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
+ this.configuration.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ String hadoopOutputFormatClassName = in.readUTF();
+
+ org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+ configuration.readFields(in);
+
+ if(this.configuration == null) {
+ this.configuration = configuration;
+ }
+
+ try {
+ this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
new file mode 100644
index 0000000..36ea378
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.example;
+
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ *
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
+ * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: WordCount <input path> <result path>");
+ return;
+ }
+
+ final String inputPath = args[0];
+ final String outputPath = args[1];
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ // Set up the Hadoop Input Format
+ Job job = Job.getInstance();
+ HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
+ TextInputFormat.addInputPath(job, new Path(inputPath));
+
+ // Create a Flink job with it
+ DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
+
+ // Tokenize the line and convert from Writable "Text" to String for better handling
+ DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
+
+ // Sum up the words
+ DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
+
+ // Convert String back to Writable "Text" for use with Hadoop Output Format
+ DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
+
+ // Set up Hadoop Output Format
+ HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
+ hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+ hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
+ // is being executed with both types (hadoop1 and hadoop2 profile)
+ TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+ // Output & Execute
+ hadoopResult.output(hadoopOutputFormat);
+ env.execute("Word Count");
+ }
+
+ /**
+ * Splits a line into words and converts Hadoop Writables into normal Java data types.
+ */
+ public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String line = value.f1.toString();
+ String[] tokens = line.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ /**
+ * Converts Java data types to Hadoop Writables.
+ */
+ public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+
+ @Override
+ public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
+ return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
new file mode 100644
index 0000000..eadbd0b
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class HadoopUtils {
+
+ /**
+ * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
+ */
+ public static void mergeHadoopConf(Configuration configuration) {
+ Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
+
+ for (Map.Entry<String, String> e : hadoopConf) {
+ configuration.set(e.getKey(), e.getValue());
+ }
+ }
+
+ public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
+ try {
+ Class<?> clazz;
+ // for Hadoop 1.xx
+ if(JobContext.class.isInterface()) {
+ clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+ }
+ // for Hadoop 2.xx
+ else {
+ clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
+ }
+ Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
+ JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
+
+ return context;
+ } catch(Exception e) {
+ throw new Exception("Could not create instance of JobContext.");
+ }
+ }
+
+ public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception {
+ try {
+ Class<?> clazz;
+ // for Hadoop 1.xx
+ if(JobContext.class.isInterface()) {
+ clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+ }
+ // for Hadoop 2.xx
+ else {
+ clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
+ }
+ Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
+ TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
+
+ return context;
+ } catch(Exception e) {
+ throw new Exception("Could not create instance of TaskAttemptContext.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..b53fc9f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapreduce.JobContext;
+
+
+public class HadoopInputSplit implements InputSplit {
+
+ public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
+ public transient JobContext jobContext;
+
+ private int splitNumber;
+
+ public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+ return mapreduceInputSplit;
+ }
+
+
+ public HadoopInputSplit() {
+ super();
+ }
+
+
+ public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
+ if(!(mapreduceInputSplit instanceof Writable)) {
+ throw new IllegalArgumentException("InputSplit must implement Writable interface.");
+ }
+ this.mapreduceInputSplit = mapreduceInputSplit;
+ this.jobContext = jobContext;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(this.splitNumber);
+ out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+ Writable w = (Writable) this.mapreduceInputSplit;
+ w.write(out);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.splitNumber=in.readInt();
+ String className = in.readUTF();
+
+ if(this.mapreduceInputSplit == null) {
+ try {
+ Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+ Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+ this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to create InputSplit", e);
+ }
+ }
+ ((Writable)this.mapreduceInputSplit).readFields(in);
+ }
+
+ @Override
+ public int getSplitNumber() {
+ return this.splitNumber;
+ }
+
+ public void setSplitNumber(int splitNumber) {
+ this.splitNumber = splitNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..d13d0f2
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+
+ @Override
+ protected void preSubmit() throws Exception {
+ textPath = createTempFile("text.txt", WordCountData.TEXT);
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ WordCount.main(new String[] { textPath, resultPath });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
new file mode 100644
index 0000000..547ea60
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapred.record;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.RecordAPITestBase;
+
+/**
+ * test the hadoop inputformat and outputformat
+ */
+public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
+ protected String textPath;
+ protected String resultPath;
+ protected String counts;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ textPath = createTempFile("text.txt", WordCountData.TEXT);
+ resultPath = getTempDirPath("result");
+ counts = WordCountData.COUNTS.replaceAll(" ", "\t");
+ }
+
+ @Override
+ protected Plan getTestJob() {
+ //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat
+ WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
+ return wc.getPlan("1", textPath, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ // Test results, append /1 to resultPath due to the generated _temproray file.
+ compareResultsByLinesInMemory(counts, resultPath + "/1");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
new file mode 100644
index 0000000..10dab3f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopInputOutputITCase extends JavaProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+
+ @Override
+ protected void preSubmit() throws Exception {
+ textPath = createTempFile("text.txt", WordCountData.TEXT);
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ WordCount.main(new String[] { textPath, resultPath });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
new file mode 100644
index 0000000..c973212
--- /dev/null
+++ b/flink-addons/flink-hbase/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-addons</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <repositories>
+ <repository>
+ <id>cloudera-releases</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <hbase.version>0.96.0-hadoop2</hbase.version>
+ </properties>
+
+ <artifactId>flink-hbase</artifactId>
+ <name>flink-hbase</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.94.2-cdh4.2.1</version>
+ <exclusions>
+ <!-- jruby is used for the hbase shell. -->
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <!-- <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ -->
+
+ <!-- hadoop-client is available for yarn and non-yarn, so there is no need
+ to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009
+ for description of hadoop-clients -->
+
+ <reporting>
+ <plugins>
+ </plugins>
+ </reporting>
+
+ <build>
+ <plugins>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
new file mode 100644
index 0000000..9029030
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public abstract class GenericTableOutputFormat implements OutputFormat<Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String JT_ID_KEY = "pact.hbase.jtkey";
+
+ public static final String JOB_ID_KEY = "pact.job.id";
+
+ private RecordWriter<ImmutableBytesWritable, KeyValue> writer;
+
+ private Configuration config;
+
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ private TaskAttemptContext context;
+
+ private String jtID;
+
+ private int jobId;
+
+
+ @Override
+ public void configure(Configuration parameters) {
+ this.config = parameters;
+
+ // get the ID parameters
+ this.jtID = parameters.getString(JT_ID_KEY, null);
+ if (this.jtID == null) {
+ throw new RuntimeException("Missing JT_ID entry in hbase config.");
+ }
+ this.jobId = parameters.getInteger(JOB_ID_KEY, -1);
+ if (this.jobId < 0) {
+ throw new RuntimeException("Missing or invalid job id in input config.");
+ }
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.hadoopConfig = getHadoopConfig(this.config);
+
+ /**
+ * PLASE NOTE:
+ * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
+ * close the pact-hbase project OR set the maven profile to hadoop_yarn
+ *
+ * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
+ * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
+ */
+ final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
+
+ this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
+ final HFileOutputFormat outFormat = new HFileOutputFormat();
+ try {
+ this.writer = outFormat.getRecordWriter(this.context);
+ } catch (InterruptedException iex) {
+ throw new IOException("Opening the writer was interrupted.", iex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ final RecordWriter<ImmutableBytesWritable, KeyValue> writer = this.writer;
+ this.writer = null;
+ if (writer != null) {
+ try {
+ writer.close(this.context);
+ } catch (InterruptedException iex) {
+ throw new IOException("Closing was interrupted.", iex);
+ }
+ }
+ }
+
+ public void collectKeyValue(KeyValue kv) throws IOException {
+ try {
+ this.writer.write(null, kv);
+ } catch (InterruptedException iex) {
+ throw new IOException("Write request was interrupted.", iex);
+ }
+ }
+
+ public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
new file mode 100644
index 0000000..ac41927
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+
+/**
+ * A sink for writing to HBase
+ */
+public class HBaseDataSink extends GenericDataSink {
+
+ private static final int IDENTIFYIER_LEN = 16;
+
+ public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) {
+ super(f, input, name);
+
+ // generate a random unique identifier string
+ final Random rnd = new Random();
+ final StringBuilder bld = new StringBuilder();
+ for (int i = 0; i < IDENTIFYIER_LEN; i++) {
+ bld.append((char) (rnd.nextInt(26) + 'a'));
+ }
+
+ setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString());
+ setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
new file mode 100644
index 0000000..9ff5af7
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.addons.hbase.common.HBaseKey;
+import org.apache.flink.addons.hbase.common.HBaseResult;
+import org.apache.flink.addons.hbase.common.HBaseUtil;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables.
+ */
+public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
+
+ /** A handle on an HBase table */
+ private HTable table;
+
+ /** The scanner that performs the actual access on the table. HBase object */
+ private Scan scan;
+
+ /** Hbase' iterator wrapper */
+ private TableRecordReader tableRecordReader;
+
+ /** helper variable to decide whether the input is exhausted or not */
+ private boolean endReached = false;
+
+ /** Job parameter that specifies the input table. */
+ public static final String INPUT_TABLE = "hbase.inputtable";
+
+ /** Location of the hbase-site.xml. If set, the HBaseAdmin will build inside */
+ public static final String CONFIG_LOCATION = "hbase.config.location";
+
+ /**
+ * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
+ * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
+ */
+ public static final String SCAN = "hbase.scan";
+
+ /** Column Family to Scan */
+ public static final String SCAN_COLUMN_FAMILY = "hbase.scan.column.family";
+
+ /** Space delimited list of columns to scan. */
+ public static final String SCAN_COLUMNS = "hbase.scan.columns";
+
+ /** The timestamp used to filter columns with a specific timestamp. */
+ public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
+
+ /** The starting timestamp used to filter columns with a specific range of versions. */
+ public static final String SCAN_TIMERANGE_START = "hbase.scan.timerange.start";
+
+ /** The ending timestamp used to filter columns with a specific range of versions. */
+ public static final String SCAN_TIMERANGE_END = "hbase.scan.timerange.end";
+
+ /** The maximum number of version to return. */
+ public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
+
+ /** Set to false to disable server-side caching of blocks for this scan. */
+ public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
+
+ /** The number of rows for caching that will be passed to scanners. */
+ public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
+
+ /** mutable objects that are used to avoid recreation of wrapper objects */
+ protected HBaseKey hbaseKey;
+
+ protected HBaseResult hbaseResult;
+
+ private org.apache.hadoop.conf.Configuration hConf;
+
+ @Override
+ public void configure(Configuration parameters) {
+ HTable table = createTable(parameters);
+ setTable(table);
+ Scan scan = createScanner(parameters);
+ setScan(scan);
+ }
+
+ /**
+ * Read the configuration and creates a {@link Scan} object.
+ *
+ * @param parameters
+ * @return
+ */
+ protected Scan createScanner(Configuration parameters) {
+ Scan scan = null;
+ if (parameters.getString(SCAN, null) != null) {
+ try {
+ scan = HBaseUtil.convertStringToScan(parameters.getString(SCAN, null));
+ } catch (IOException e) {
+ LOG.error("An error occurred.", e);
+ }
+ } else {
+ try {
+ scan = new Scan();
+
+ // if (parameters.getString(SCAN_COLUMNS, null) != null) {
+ // scan.addColumns(parameters.getString(SCAN_COLUMNS, null));
+ // }
+
+ if (parameters.getString(SCAN_COLUMN_FAMILY, null) != null) {
+ scan.addFamily(Bytes.toBytes(parameters.getString(SCAN_COLUMN_FAMILY, null)));
+ }
+
+ if (parameters.getString(SCAN_TIMESTAMP, null) != null) {
+ scan.setTimeStamp(Long.parseLong(parameters.getString(SCAN_TIMESTAMP, null)));
+ }
+
+ if (parameters.getString(SCAN_TIMERANGE_START, null) != null
+ && parameters.getString(SCAN_TIMERANGE_END, null) != null) {
+ scan.setTimeRange(
+ Long.parseLong(parameters.getString(SCAN_TIMERANGE_START, null)),
+ Long.parseLong(parameters.getString(SCAN_TIMERANGE_END, null)));
+ }
+
+ if (parameters.getString(SCAN_MAXVERSIONS, null) != null) {
+ scan.setMaxVersions(Integer.parseInt(parameters.getString(SCAN_MAXVERSIONS, null)));
+ }
+
+ if (parameters.getString(SCAN_CACHEDROWS, null) != null) {
+ scan.setCaching(Integer.parseInt(parameters.getString(SCAN_CACHEDROWS, null)));
+ }
+
+ // false by default, full table scans generate too much BC churn
+ scan.setCacheBlocks((parameters.getBoolean(SCAN_CACHEBLOCKS, false)));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ }
+ return scan;
+ }
+
+ /**
+ * Create an {@link HTable} instance and set it into this format.
+ *
+ * @param parameters
+ * a {@link Configuration} that holds at least the table name.
+ */
+ protected HTable createTable(Configuration parameters) {
+ String configLocation = parameters.getString(TableInputFormat.CONFIG_LOCATION, null);
+ LOG.info("Got config location: " + configLocation);
+ if (configLocation != null)
+ {
+ org.apache.hadoop.conf.Configuration dummyConf = new org.apache.hadoop.conf.Configuration();
+ if(OperatingSystem.isWindows()) {
+ dummyConf.addResource(new Path("file:/" + configLocation));
+ } else {
+ dummyConf.addResource(new Path("file://" + configLocation));
+ }
+ hConf = HBaseConfiguration.create(dummyConf);
+ ;
+ // hConf.set("hbase.master", "im1a5.internetmemory.org");
+ LOG.info("hbase master: " + hConf.get("hbase.master"));
+ LOG.info("zookeeper quorum: " + hConf.get("hbase.zookeeper.quorum"));
+
+ }
+ String tableName = parameters.getString(INPUT_TABLE, "");
+ try {
+ return new HTable(this.hConf, tableName);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ return null;
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return this.endReached;
+ }
+
+ protected boolean nextResult() throws IOException {
+ if (this.tableRecordReader == null)
+ {
+ throw new IOException("No table record reader provided!");
+ }
+
+ try {
+ if (this.tableRecordReader.nextKeyValue())
+ {
+ ImmutableBytesWritable currentKey = this.tableRecordReader.getCurrentKey();
+ Result currentValue = this.tableRecordReader.getCurrentValue();
+
+ hbaseKey.setWritable(currentKey);
+ hbaseResult.setResult(currentValue);
+ } else
+ {
+ this.endReached = true;
+ return false;
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Table reader has been interrupted", e);
+ throw new IOException(e);
+ }
+
+ return true;
+ }
+
+ @Override
+ public Record nextRecord(Record record) throws IOException {
+ if (nextResult()) {
+ mapResultToRecord(record, hbaseKey, hbaseResult);
+ return record;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Maps the current HBase Result into a Record.
+ * This implementation simply stores the HBaseKey at position 0, and the HBase Result object at position 1.
+ *
+ * @param record
+ * @param key
+ * @param result
+ */
+ public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
+ record.setField(0, key);
+ record.setField(1, result);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.tableRecordReader.close();
+ }
+
+ @Override
+ public void open(TableInputSplit split) throws IOException {
+ if (split == null)
+ {
+ throw new IOException("Input split is null!");
+ }
+
+ if (this.table == null)
+ {
+ throw new IOException("No HTable provided!");
+ }
+
+ if (this.scan == null)
+ {
+ throw new IOException("No Scan instance provided");
+ }
+
+ this.tableRecordReader = new TableRecordReader();
+
+ this.tableRecordReader.setHTable(this.table);
+
+ Scan sc = new Scan(this.scan);
+ sc.setStartRow(split.getStartRow());
+ LOG.info("split start row: " + new String(split.getStartRow()));
+ sc.setStopRow(split.getEndRow());
+ LOG.info("split end row: " + new String(split.getEndRow()));
+
+ this.tableRecordReader.setScan(sc);
+ this.tableRecordReader.restart(split.getStartRow());
+
+ this.hbaseKey = new HBaseKey();
+ this.hbaseResult = new HBaseResult();
+
+ endReached = false;
+ }
+
+
+ @Override
+ public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
+
+ if (this.table == null) {
+ throw new IOException("No table was provided.");
+ }
+
+ final Pair<byte[][], byte[][]> keys = this.table.getStartEndKeys();
+
+ if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+
+ throw new IOException("Expecting at least one region.");
+ }
+ int count = 0;
+ final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(keys.getFirst().length);
+ for (int i = 0; i < keys.getFirst().length; i++) {
+
+ if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+ continue;
+ }
+
+ final String regionLocation = this.table.getRegionLocation(keys.getFirst()[i], false).getHostnamePort();
+ final byte[] startRow = this.scan.getStartRow();
+ final byte[] stopRow = this.scan.getStopRow();
+
+ // determine if the given start an stop key fall into the region
+ if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+ Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+ (stopRow.length == 0 ||
+ Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+
+ final byte[] splitStart = startRow.length == 0 ||
+ Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+ keys.getFirst()[i] : startRow;
+ final byte[] splitStop = (stopRow.length == 0 ||
+ Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+ keys.getSecond()[i].length > 0 ?
+ keys.getSecond()[i] : stopRow;
+ final TableInputSplit split = new TableInputSplit(splits.size(), new String[] { regionLocation },
+ this.table.getTableName(), splitStart, splitStop);
+ splits.add(split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+ }
+ }
+ }
+
+ return splits.toArray(new TableInputSplit[0]);
+ }
+
+ /**
+ * Test if the given region is to be included in the InputSplit while splitting
+ * the regions of a table.
+ * <p>
+ * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
+ * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
+ * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
+ * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
+ * to the ordering of the keys. <br>
+ * <br>
+ * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
+ * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
+ * i.e. all regions are included).
+ *
+ * @param startKey
+ * Start key of the region
+ * @param endKey
+ * End key of the region
+ * @return true, if this region needs to be included as part of the input (default).
+ */
+ private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+ return true;
+ }
+
+
+ @Override
+ public Class<TableInputSplit> getInputSplitType() {
+
+ return TableInputSplit.class;
+ }
+
+ public void setTable(HTable table)
+ {
+ this.table = table;
+ }
+
+ public HTable getTable() {
+ return table;
+ }
+
+ public void setScan(Scan scan)
+ {
+ this.scan = scan;
+ }
+
+ public Scan getScan() {
+ return scan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
new file mode 100644
index 0000000..a77402d
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
+ * references to row below refer to the key of the row.
+ */
+public class TableInputSplit extends LocatableInputSplit {
+
+ /**
+ * The name of the table to retrieve data from
+ */
+ private byte[] tableName;
+
+ /**
+ * The start row of the split.
+ */
+ private byte[] startRow;
+
+ /**
+ * The end row of the split.
+ */
+ private byte[] endRow;
+
+ /**
+ * Creates a new table input split
+ *
+ * @param splitNumber
+ * the number of the input split
+ * @param hostnames
+ * the names of the hosts storing the data the input split refers to
+ * @param tableName
+ * the name of the table to retrieve data from
+ * @param startRow
+ * the start row of the split
+ * @param endRow
+ * the end row of the split
+ */
+ TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
+ final byte[] endRow) {
+ super(splitNumber, hostnames);
+
+ this.tableName = tableName;
+ this.startRow = startRow;
+ this.endRow = endRow;
+ }
+
+ /**
+ * Default constructor for serialization/deserialization.
+ */
+ public TableInputSplit() {
+ super();
+
+ this.tableName = null;
+ this.startRow = null;
+ this.endRow = null;
+ }
+
+ /**
+ * Returns the table name.
+ *
+ * @return The table name.
+ */
+ public byte[] getTableName() {
+ return this.tableName;
+ }
+
+ /**
+ * Returns the start row.
+ *
+ * @return The start row.
+ */
+ public byte[] getStartRow() {
+ return this.startRow;
+ }
+
+ /**
+ * Returns the end row.
+ *
+ * @return The end row.
+ */
+ public byte[] getEndRow() {
+ return this.endRow;
+ }
+
+
+ @Override
+ public void write(final DataOutputView out) throws IOException {
+
+ super.write(out);
+
+ // Write the table name
+ if (this.tableName == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(this.tableName.length);
+ out.write(this.tableName);
+ }
+
+ // Write the start row
+ if (this.startRow == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(this.startRow.length);
+ out.write(this.startRow);
+ }
+
+ // Write the end row
+ if (this.endRow == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(this.endRow.length);
+ out.write(this.endRow);
+ }
+ }
+
+
+ @Override
+ public void read(final DataInputView in) throws IOException {
+
+ super.read(in);
+
+ // Read the table name
+ int len = in.readInt();
+ if (len >= 0) {
+ this.tableName = new byte[len];
+ in.readFully(this.tableName);
+ }
+
+ // Read the start row
+ len = in.readInt();
+ if (len >= 0) {
+ this.startRow = new byte[len];
+ in.readFully(this.startRow);
+ }
+
+ // Read the end row
+ len = in.readInt();
+ if (len >= 0) {
+ this.endRow = new byte[len];
+ in.readFully(this.endRow);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
new file mode 100644
index 0000000..44d64de
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseKey.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.common;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Simple wrapper to encapsulate an HBase h{@link ImmutableBytesWritable} as a Key
+ */
+public class HBaseKey implements Key<HBaseKey> {
+
+ private static final long serialVersionUID = 1L;
+
+ private ImmutableBytesWritable writable;
+
+
+ public HBaseKey() {
+ this.writable = new ImmutableBytesWritable();
+ }
+
+
+ public HBaseKey(ImmutableBytesWritable writable) {
+ this.writable = writable;
+ }
+
+
+ public ImmutableBytesWritable getWritable() {
+ return writable;
+ }
+
+ public void setWritable(ImmutableBytesWritable writable) {
+ this.writable = writable;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ this.writable.write(out);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.writable.readFields(in);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.writable.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == HBaseKey.class) {
+ return this.writable.equals(((HBaseKey) obj).writable);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(HBaseKey other) {
+ return this.writable.compareTo(other.writable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
new file mode 100644
index 0000000..d66f59f
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseResult.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.common;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+import org.apache.hadoop.hbase.client.Result;
+
+public class HBaseResult implements Value {
+
+ private static final long serialVersionUID = 1L;
+
+ private Result result;
+
+
+ public HBaseResult() {
+ this.result = new Result();
+ }
+
+ public HBaseResult(Result result) {
+ this.result = result;
+ }
+
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public String getStringData() {
+ if(this.result != null) {
+ return this.result.toString();
+ }
+ return null;
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.result.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ this.result.write(out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
new file mode 100644
index 0000000..c1911c5
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/common/HBaseUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Base64;
+
+/**
+ * Utility for {@link TableInputFormat}
+ */
+public class HBaseUtil {
+
+ /**
+ * Writes the given scan into a Base64 encoded string.
+ *
+ * @param scan
+ * The scan to write out.
+ * @return The scan saved in a Base64 encoded string.
+ * @throws IOException
+ * When writing the scan fails.
+ */
+ static String convertScanToString(Scan scan) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out);
+ scan.write(dos);
+ return Base64.encodeBytes(out.toByteArray());
+ }
+
+ /**
+ * Converts the given Base64 string back into a Scan instance.
+ *
+ * @param base64
+ * The scan details.
+ * @return The newly created Scan instance.
+ * @throws IOException
+ * When reading the scan instance fails.
+ */
+ public static Scan convertStringToScan(String base64) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
+ DataInputStream dis = new DataInputStream(bis);
+ Scan scan = new Scan();
+ scan.readFields(dis);
+ return scan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100644
index 0000000..a7bc2b3
--- /dev/null
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.addons.hbase.common.HBaseKey;
+import org.apache.flink.addons.hbase.common.HBaseResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.io.CsvOutputFormat;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * the occurrences of each word in the file.
+ */
+public class HBaseReadExample implements Program, ProgramDescription {
+
+ public static class MyTableInputFormat extends TableInputFormat {
+
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] META_FAMILY = "meta".getBytes();
+
+ private final byte[] USER_COLUMN = "user".getBytes();
+
+ private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
+
+ private final byte[] TEXT_FAMILY = "text".getBytes();
+
+ private final byte[] TWEET_COLUMN = "tweet".getBytes();
+
+ public MyTableInputFormat() {
+ super();
+
+ }
+
+ @Override
+ protected HTable createTable(Configuration parameters) {
+ return super.createTable(parameters);
+ }
+
+ @Override
+ protected Scan createScanner(Configuration parameters) {
+ Scan scan = new Scan ();
+ scan.addColumn (META_FAMILY, USER_COLUMN);
+ scan.addColumn (META_FAMILY, TIMESTAMP_COLUMN);
+ scan.addColumn (TEXT_FAMILY, TWEET_COLUMN);
+ return scan;
+ }
+
+ StringValue row_string = new StringValue();
+ StringValue user_string = new StringValue();
+ StringValue timestamp_string = new StringValue();
+ StringValue tweet_string = new StringValue();
+
+ @Override
+ public void mapResultToRecord(Record record, HBaseKey key,
+ HBaseResult result) {
+ Result res = result.getResult();
+ res.getRow();
+ record.setField(0, toString(row_string, res.getRow()));
+ record.setField(1, toString (user_string, res.getValue(META_FAMILY, USER_COLUMN)));
+ record.setField(2, toString (timestamp_string, res.getValue(META_FAMILY, TIMESTAMP_COLUMN)));
+ record.setField(3, toString (tweet_string, res.getValue(TEXT_FAMILY, TWEET_COLUMN)));
+ }
+
+ private final StringValue toString (StringValue string, byte[] bytes) {
+ string.setValueAscii(bytes, 0, bytes.length);
+ return string;
+ }
+
+ }
+
+
+ @Override
+ public Plan getPlan(String... args) {
+ // parse job parameters
+ int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
+ String output = (args.length > 1 ? args[1] : "");
+
+ GenericDataSource<TableInputFormat> source = new GenericDataSource<TableInputFormat>(new MyTableInputFormat(), "HBase Input");
+ source.setParameter(TableInputFormat.INPUT_TABLE, "twitter");
+ source.setParameter(TableInputFormat.CONFIG_LOCATION, "/etc/hbase/conf/hbase-site.xml");
+ FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, source, "HBase String dump");
+ CsvOutputFormat.configureRecordFormat(out)
+ .recordDelimiter('\n')
+ .fieldDelimiter(' ')
+ .field(StringValue.class, 0)
+ .field(StringValue.class, 1)
+ .field(StringValue.class, 2)
+ .field(StringValue.class, 3);
+
+ Plan plan = new Plan(out, "HBase access Example");
+ plan.setDefaultParallelism(numSubTasks);
+ return plan;
+ }
+
+
+ @Override
+ public String getDescription() {
+ return "Parameters: [numSubStasks] [input] [output]";
+ }
+}