You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:03 UTC
[63/92] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
deleted file mode 100644
index 2a0c4d3..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-@SuppressWarnings("rawtypes")
-public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
- flinkRecord.setField(0, convertKey(hadoopKey));
- flinkRecord.setField(1, convertValue(hadoopValue));
- }
-
- @SuppressWarnings("unchecked")
- private final Value convertKey(K in) {
- return new WritableComparableWrapper(in);
- }
-
- private final Value convertValue(V in) {
- return new WritableWrapper<V>(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
deleted file mode 100644
index 25caf0c..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- *
- * <br /><br />
- *
- * <b>Note</b>: This example uses the out dated Record API.
- * It is recommended to use the new Java API.
- *
- * @see org.apache.flink.hadoopcompatibility.mapred.example.WordCount
- */
-public class WordCount implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
-
- /**
- * Converts a Record containing one string in to multiple string/integer pairs.
- * The string is tokenized by whitespaces. For each token a new record is emitted,
- * where the token is the first field and an Integer(1) is the second field.
- */
- public static class TokenizeLine extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> collector) {
- // get the first field (as type StringValue) from the record
- String line = record.getField(1, StringValue.class).getValue();
- // normalize the line
- line = line.replaceAll("\\W+", " ").toLowerCase();
-
- // tokenize the line
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()) {
- String word = tokenizer.nextToken();
-
- // we emit a (word, 1) pair
- collector.collect(new Record(new StringValue(word), new IntValue(1)));
- }
- }
- }
-
- /**
- * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
- * in the record. The other fields are not modified.
- */
- @Combinable
- @ConstantFields(0)
- public static class CountWords extends ReduceFunction implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- Record element = null;
- int sum = 0;
- while (records.hasNext()) {
- element = records.next();
- int cnt = element.getField(1, IntValue.class).getValue();
- sum += cnt;
- }
-
- element.setField(1, new IntValue(sum));
- out.collect(element);
- }
-
- @Override
- public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
- // the logic is the same as in the reduce function, so simply call the reduce method
- reduce(records, out);
- }
- }
-
-
- @SuppressWarnings({ "rawtypes", "unchecked", "unused" })
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String dataInput = (args.length > 1 ? args[1] : "");
- String output = (args.length > 2 ? args[2] : "");
-
-
- HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
- TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-
- // Example with Wrapper Converter
- HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>(
- new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
- TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-
- MapOperator mapper = MapOperator.builder(new TokenizeLine())
- .input(source)
- .name("Tokenize Lines")
- .build();
- ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
- .input(mapper)
- .name("Count Words")
- .build();
- FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts");
- CsvOutputFormat.configureRecordFormat(out)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .field(StringValue.class, 0)
- .field(IntValue.class, 1);
-
- Plan plan = new Plan(out, "WordCount Example");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
-
- @Override
- public String getDescription() {
- return "Parameters: [numSubStasks] [input] [output]";
- }
-
-
- public static void main(String[] args) throws Exception {
- WordCount wc = new WordCount();
-
- if (args.length < 3) {
- System.err.println(wc.getDescription());
- System.exit(1);
- }
-
- Plan plan = wc.getPlan(args);
-
- // This will execute the word-count embedded in a local context. replace this line by the commented
- // succeeding line to send the job to a local installation or to a cluster for execution
- LocalExecutor.execute(plan);
-// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-// ex.executePlan(plan);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
deleted file mode 100644
index a3cd3d5..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSink;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- *
- * <br /><br />
- *
- * <b>Note</b>: This example uses the out dated Record API.
- * It is recommended to use the new Java API.
- *
- * @see WordCount
- */
-@SuppressWarnings("serial")
-public class WordCountWithOutputFormat implements Program, ProgramDescription {
-
- /**
- * Converts a Record containing one string in to multiple string/integer pairs.
- * The string is tokenized by whitespaces. For each token a new record is emitted,
- * where the token is the first field and an Integer(1) is the second field.
- */
- public static class TokenizeLine extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> collector) {
- // get the first field (as type StringValue) from the record
- String line = record.getField(1, StringValue.class).getValue();
- // normalize the line
- line = line.replaceAll("\\W+", " ").toLowerCase();
-
- // tokenize the line
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()) {
- String word = tokenizer.nextToken();
-
- // we emit a (word, 1) pair
- collector.collect(new Record(new StringValue(word), new IntValue(1)));
- }
- }
- }
-
- /**
- * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
- * in the record. The other fields are not modified.
- */
- @Combinable
- @ConstantFields(0)
- public static class CountWords extends ReduceFunction implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- Record element = null;
- int sum = 0;
- while (records.hasNext()) {
- element = records.next();
- int cnt = element.getField(1, IntValue.class).getValue();
- sum += cnt;
- }
-
- element.setField(1, new IntValue(sum));
- out.collect(element);
- }
-
- @Override
- public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
- // the logic is the same as in the reduce function, so simply call the reduce method
- reduce(records, out);
- }
- }
-
-
- @Override
- public Plan getPlan(String... args) {
- // parse job parameters
- int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
- String dataInput = (args.length > 1 ? args[1] : "");
- String output = (args.length > 2 ? args[2] : "");
-
- HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
- new TextInputFormat(), new JobConf(), "Input Lines");
- TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-
-
- MapOperator mapper = MapOperator.builder(new TokenizeLine())
- .input(source)
- .name("Tokenize Lines")
- .build();
- ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
- .input(mapper)
- .name("Count Words")
- .build();
- HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
- TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
-
- Plan plan = new Plan(out, "Hadoop OutputFormat Example");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
- }
-
-
- @Override
- public String getDescription() {
- return "Parameters: [numSubStasks] [input] [output]";
- }
-
-
- public static void main(String[] args) throws Exception {
- WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
-
- if (args.length < 3) {
- System.err.println(wc.getDescription());
- System.exit(1);
- }
-
- Plan plan = wc.getPlan(args);
-
- // This will execute the word-count embedded in a local context. replace this line by the commented
- // succeeding line to send the job to a local installation or to a cluster for execution
- LocalExecutor.execute(plan);
-// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-// ex.executePlan(plan);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
deleted file mode 100644
index c679733..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-
-public class HadoopUtils {
-
- /**
- * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
- */
- public static void mergeHadoopConf(JobConf jobConf) {
- org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
- for (Map.Entry<String, String> e : hadoopConf) {
- jobConf.set(e.getKey(), e.getValue());
- }
- }
-
- public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception {
- try {
- // for Hadoop 1.xx
- Class<?> clazz = null;
- if(!TaskAttemptContext.class.isInterface()) {
- clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader());
- }
- // for Hadoop 2.xx
- else {
- clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
- }
- Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class);
- // for Hadoop 1.xx
- constructor.setAccessible(true);
- JobContext context = (JobContext) constructor.newInstance(jobConf, jobId);
-
- return context;
- } catch(Exception e) {
- throw new Exception("Could not create instance of JobContext.", e);
- }
- }
-
- public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception {
- try {
- // for Hadoop 1.xx
- Class<?> clazz = null;
- if(!TaskAttemptContext.class.isInterface()) {
- clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
- }
- // for Hadoop 2.xx
- else {
- clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
- }
- Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
- // for Hadoop 1.xx
- constructor.setAccessible(true);
- TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
- return context;
- } catch(Exception e) {
- throw new Exception("Could not create instance of TaskAttemptContext.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
deleted file mode 100644
index 058a60f..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * This is a dummy progress
- *
- */
-public class HadoopDummyProgressable implements Progressable {
- @Override
- public void progress() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
deleted file mode 100644
index 87a6014..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This is a dummy progress monitor / reporter
- *
- */
-public class HadoopDummyReporter implements Reporter {
-
- @Override
- public void progress() {
- }
-
- @Override
- public void setStatus(String status) {
-
- }
-
- @Override
- public Counter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
-
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
-
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- @Override
- public float getProgress() {
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
deleted file mode 100644
index da46690..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapred.JobConf;
-
-
-public class HadoopInputSplit implements InputSplit {
-
- private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
- private JobConf jobConf;
- private int splitNumber;
- private String hadoopInputSplitTypeName;
-
-
- public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
- return hadoopInputSplit;
- }
-
-
- public HadoopInputSplit() {
- super();
- }
-
-
- public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
- this.hadoopInputSplit = hInputSplit;
- this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
- this.jobConf = jobconf;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeInt(splitNumber);
- out.writeUTF(hadoopInputSplitTypeName);
- hadoopInputSplit.write(out);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.splitNumber=in.readInt();
- this.hadoopInputSplitTypeName = in.readUTF();
- if(hadoopInputSplit == null) {
- try {
- Class<? extends org.apache.hadoop.io.Writable> inputSplit =
- Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
- this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
- }
- catch (Exception e) {
- throw new RuntimeException("Unable to create InputSplit", e);
- }
- }
- this.hadoopInputSplit.readFields(in);
- }
-
- @Override
- public int getSplitNumber() {
- return this.splitNumber;
- }
-
- public void setSplitNumber(int splitNumber) {
- this.splitNumber = splitNumber;
- }
-
- public void setHadoopInputSplit(
- org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
- this.hadoopInputSplit = hadoopInputSplit;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
deleted file mode 100644
index cf12cae..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.types.TypeInformation;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
-
- private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
- private Class<K> keyClass;
- private Class<V> valueClass;
- private org.apache.hadoop.conf.Configuration configuration;
-
- private transient RecordReader<K, V> recordReader;
- private boolean fetched = false;
- private boolean hasNext;
-
- public HadoopInputFormat() {
- super();
- }
-
- public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
- super();
- this.mapreduceInputFormat = mapreduceInputFormat;
- this.keyClass = key;
- this.valueClass = value;
- this.configuration = job.getConfiguration();
- HadoopUtils.mergeHadoopConf(configuration);
- }
-
- public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
- this.configuration = configuration;
- }
-
- public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
- return this.mapreduceInputFormat;
- }
-
- public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
- this.mapreduceInputFormat = mapreduceInputFormat;
- }
-
- public org.apache.hadoop.conf.Configuration getConfiguration() {
- return this.configuration;
- }
-
- // --------------------------------------------------------------------------------------------
- // InputFormat
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void configure(Configuration parameters) {
- // nothing to do
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
- // only gather base statistics for FileInputFormats
- if(!(mapreduceInputFormat instanceof FileInputFormat)) {
- return null;
- }
-
- JobContext jobContext = null;
- try {
- jobContext = HadoopUtils.instantiateJobContext(configuration, null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
- (FileBaseStatistics) cachedStats : null;
-
- try {
- final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
- return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
- } catch (IOException ioex) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Could not determine statistics due to an io error: "
- + ioex.getMessage());
- }
- } catch (Throwable t) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Unexpected problem while getting the file statistics: "
- + t.getMessage(), t);
- }
- }
-
- // no statistics available
- return null;
- }
-
- @Override
- public HadoopInputSplit[] createInputSplits(int minNumSplits)
- throws IOException {
- configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
-
- JobContext jobContext = null;
- try {
- jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- List<org.apache.hadoop.mapreduce.InputSplit> splits;
- try {
- splits = this.mapreduceInputFormat.getSplits(jobContext);
- } catch (InterruptedException e) {
- throw new IOException("Could not get Splits.", e);
- }
- HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
-
- for(int i = 0; i < hadoopInputSplits.length; i++){
- hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext);
- }
- return hadoopInputSplits;
- }
-
- @Override
- public Class<? extends HadoopInputSplit> getInputSplitType() {
- return HadoopInputSplit.class;
- }
-
- @Override
- public void open(HadoopInputSplit split) throws IOException {
- TaskAttemptContext context = null;
- try {
- context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
-
- try {
- this.recordReader = this.mapreduceInputFormat
- .createRecordReader(split.getHadoopInputSplit(), context);
- this.recordReader.initialize(split.getHadoopInputSplit(), context);
- } catch (InterruptedException e) {
- throw new IOException("Could not create RecordReader.", e);
- } finally {
- this.fetched = false;
- }
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- if(!this.fetched) {
- fetchNext();
- }
- return !this.hasNext;
- }
-
- private void fetchNext() throws IOException {
- try {
- this.hasNext = this.recordReader.nextKeyValue();
- } catch (InterruptedException e) {
- throw new IOException("Could not fetch next KeyValue pair.", e);
- } finally {
- this.fetched = true;
- }
- }
-
- @Override
- public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
- if(!this.fetched) {
- fetchNext();
- }
- if(!this.hasNext) {
- return null;
- }
- try {
- record.f0 = this.recordReader.getCurrentKey();
- record.f1 = this.recordReader.getCurrentValue();
- } catch (InterruptedException e) {
- throw new IOException("Could not get KeyValue pair.", e);
- }
- this.fetched = false;
-
- return record;
- }
-
- @Override
- public void close() throws IOException {
- this.recordReader.close();
- }
-
- // --------------------------------------------------------------------------------------------
- // Helper methods
- // --------------------------------------------------------------------------------------------
-
- private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
- ArrayList<FileStatus> files) throws IOException {
-
- long latestModTime = 0L;
-
- // get the file info and check whether the cached statistics are still valid.
- for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-
- final Path filePath = new Path(hadoopPath.toUri());
- final FileSystem fs = FileSystem.get(filePath.toUri());
-
- final FileStatus file = fs.getFileStatus(filePath);
- latestModTime = Math.max(latestModTime, file.getModificationTime());
-
- // enumerate all files and check their modification time stamp.
- if (file.isDir()) {
- FileStatus[] fss = fs.listStatus(filePath);
- files.ensureCapacity(files.size() + fss.length);
-
- for (FileStatus s : fss) {
- if (!s.isDir()) {
- files.add(s);
- latestModTime = Math.max(s.getModificationTime(), latestModTime);
- }
- }
- } else {
- files.add(file);
- }
- }
-
- // check whether the cached statistics are still valid, if we have any
- if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
- return cachedStats;
- }
-
- // calculate the whole length
- long len = 0;
- for (FileStatus s : files) {
- len += s.getLen();
- }
-
- // sanity check
- if (len <= 0) {
- len = BaseStatistics.SIZE_UNKNOWN;
- }
-
- return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom serialization methods
- // --------------------------------------------------------------------------------------------
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeUTF(this.mapreduceInputFormat.getClass().getName());
- out.writeUTF(this.keyClass.getName());
- out.writeUTF(this.valueClass.getName());
- this.configuration.write(out);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- String hadoopInputFormatClassName = in.readUTF();
- String keyClassName = in.readUTF();
- String valueClassName = in.readUTF();
-
- org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
- configuration.readFields(in);
-
- if(this.configuration == null) {
- this.configuration = configuration;
- }
-
- try {
- this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
- } catch (Exception e) {
- throw new RuntimeException("Unable to instantiate the hadoop input format", e);
- }
- try {
- this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
- } catch (Exception e) {
- throw new RuntimeException("Unable to find key class.", e);
- }
- try {
- this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
- } catch (Exception e) {
- throw new RuntimeException("Unable to find value class.", e);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // ResultTypeQueryable
- // --------------------------------------------------------------------------------------------
-
- @Override
- public TypeInformation<Tuple2<K,V>> getProducedType() {
- return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
deleted file mode 100644
index 9eabc03..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
-
- private static final long serialVersionUID = 1L;
-
- private org.apache.hadoop.conf.Configuration configuration;
- private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
- private transient RecordWriter<K,V> recordWriter;
- private transient FileOutputCommitter fileOutputCommitter;
- private transient TaskAttemptContext context;
-
- public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
- super();
- this.mapreduceOutputFormat = mapreduceOutputFormat;
- this.configuration = job.getConfiguration();
- HadoopUtils.mergeHadoopConf(configuration);
- }
-
- public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
- this.configuration = configuration;
- }
-
- public org.apache.hadoop.conf.Configuration getConfiguration() {
- return this.configuration;
- }
-
- public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() {
- return this.mapreduceOutputFormat;
- }
-
- public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) {
- this.mapreduceOutputFormat = mapreduceOutputFormat;
- }
-
- // --------------------------------------------------------------------------------------------
- // OutputFormat
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void configure(Configuration parameters) {
- // nothing to do
- }
-
- /**
- * create the temporary output file for hadoop RecordWriter.
- * @param taskNumber The number of the parallel instance.
- * @param numTasks The number of parallel tasks.
- * @throws IOException
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- if (Integer.toString(taskNumber + 1).length() > 6) {
- throw new IOException("Task id too large.");
- }
-
- // for hadoop 2.2
- this.configuration.set("mapreduce.output.basename", "tmp");
-
- TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
- + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
- + Integer.toString(taskNumber + 1)
- + "_0");
-
- try {
- this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- this.configuration.set("mapred.task.id", taskAttemptID.toString());
- // for hadoop 2.2
- this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-
- this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
-
- try {
- this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
- this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
-
- try {
- this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
- } catch (InterruptedException e) {
- throw new IOException("Could not create RecordWriter.", e);
- }
- }
-
-
- @Override
- public void writeRecord(Tuple2<K, V> record) throws IOException {
- try {
- this.recordWriter.write(record.f0, record.f1);
- } catch (InterruptedException e) {
- throw new IOException("Could not write Record.", e);
- }
- }
-
- /**
- * commit the task by moving the output file out from the temporary directory.
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- try {
- this.recordWriter.close(this.context);
- } catch (InterruptedException e) {
- throw new IOException("Could not close RecordReader.", e);
- }
-
- if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
- this.fileOutputCommitter.commitTask(this.context);
- }
- this.fileOutputCommitter.commitJob(this.context);
-
- // rename tmp-* files to final name
- FileSystem fs = FileSystem.get(this.configuration);
-
- Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
- final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
-
- // isDirectory does not work in hadoop 1
- if(fs.getFileStatus(outputPath).isDir()) {
- FileStatus[] files = fs.listStatus(outputPath);
-
- for(FileStatus f : files) {
- Matcher m = p.matcher(f.getPath().getName());
- if(m.matches()) {
- int part = Integer.valueOf(m.group(2));
- fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
- }
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom serialization methods
- // --------------------------------------------------------------------------------------------
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
- this.configuration.write(out);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- String hadoopOutputFormatClassName = in.readUTF();
-
- org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
- configuration.readFields(in);
-
- if(this.configuration == null) {
- this.configuration = configuration;
- }
-
- try {
- this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
- } catch (Exception e) {
- throw new RuntimeException("Unable to instantiate the hadoop output format", e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
deleted file mode 100644
index 36ea378..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.example;
-
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- *
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
- * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
- */
-@SuppressWarnings("serial")
-public class WordCount {
-
- public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: WordCount <input path> <result path>");
- return;
- }
-
- final String inputPath = args[0];
- final String outputPath = args[1];
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
-
- // Set up the Hadoop Input Format
- Job job = Job.getInstance();
- HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
- TextInputFormat.addInputPath(job, new Path(inputPath));
-
- // Create a Flink job with it
- DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-
- // Tokenize the line and convert from Writable "Text" to String for better handling
- DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
-
- // Sum up the words
- DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
-
- // Convert String back to Writable "Text" for use with Hadoop Output Format
- DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
-
- // Set up Hadoop Output Format
- HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
- hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
- hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
- // is being executed with both types (hadoop1 and hadoop2 profile)
- TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
- // Output & Execute
- hadoopResult.output(hadoopOutputFormat);
- env.execute("Word Count");
- }
-
- /**
- * Splits a line into words and converts Hadoop Writables into normal Java data types.
- */
- public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String line = value.f1.toString();
- String[] tokens = line.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
- /**
- * Converts Java data types to Hadoop Writables.
- */
- public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-
- @Override
- public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
- return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
deleted file mode 100644
index eadbd0b..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-public class HadoopUtils {
-
- /**
- * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
- */
- public static void mergeHadoopConf(Configuration configuration) {
- Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
-
- for (Map.Entry<String, String> e : hadoopConf) {
- configuration.set(e.getKey(), e.getValue());
- }
- }
-
- public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
- try {
- Class<?> clazz;
- // for Hadoop 1.xx
- if(JobContext.class.isInterface()) {
- clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
- }
- // for Hadoop 2.xx
- else {
- clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
- }
- Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
- JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
-
- return context;
- } catch(Exception e) {
- throw new Exception("Could not create instance of JobContext.");
- }
- }
-
- public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception {
- try {
- Class<?> clazz;
- // for Hadoop 1.xx
- if(JobContext.class.isInterface()) {
- clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
- }
- // for Hadoop 2.xx
- else {
- clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
- }
- Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
- TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
-
- return context;
- } catch(Exception e) {
- throw new Exception("Could not create instance of TaskAttemptContext.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
deleted file mode 100644
index b53fc9f..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapreduce.JobContext;
-
-
-public class HadoopInputSplit implements InputSplit {
-
- public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
- public transient JobContext jobContext;
-
- private int splitNumber;
-
- public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
- return mapreduceInputSplit;
- }
-
-
- public HadoopInputSplit() {
- super();
- }
-
-
- public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
- if(!(mapreduceInputSplit instanceof Writable)) {
- throw new IllegalArgumentException("InputSplit must implement Writable interface.");
- }
- this.mapreduceInputSplit = mapreduceInputSplit;
- this.jobContext = jobContext;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeInt(this.splitNumber);
- out.writeUTF(this.mapreduceInputSplit.getClass().getName());
- Writable w = (Writable) this.mapreduceInputSplit;
- w.write(out);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.splitNumber=in.readInt();
- String className = in.readUTF();
-
- if(this.mapreduceInputSplit == null) {
- try {
- Class<? extends org.apache.hadoop.io.Writable> inputSplit =
- Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
- this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
- } catch (Exception e) {
- throw new RuntimeException("Unable to create InputSplit", e);
- }
- }
- ((Writable)this.mapreduceInputSplit).readFields(in);
- }
-
- @Override
- public int getSplitNumber() {
- return this.splitNumber;
- }
-
- public void setSplitNumber(int splitNumber) {
- this.splitNumber = splitNumber;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java b/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
deleted file mode 100644
index d13d0f2..0000000
--- a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
- }
-
- @Override
- protected void testProgram() throws Exception {
- WordCount.main(new String[] { textPath, resultPath });
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
deleted file mode 100644
index 547ea60..0000000
--- a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapred.record;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-/**
- * test the hadoop inputformat and outputformat
- */
-public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
- protected String textPath;
- protected String resultPath;
- protected String counts;
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- counts = WordCountData.COUNTS.replaceAll(" ", "\t");
- }
-
- @Override
- protected Plan getTestJob() {
- //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat
- WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
- return wc.getPlan("1", textPath, resultPath);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- // Test results, append /1 to resultPath due to the generated _temproray file.
- compareResultsByLinesInMemory(counts, resultPath + "/1");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
deleted file mode 100644
index 10dab3f..0000000
--- a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce;
-
-import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
- }
-
- @Override
- protected void testProgram() throws Exception {
- WordCount.main(new String[] { textPath, resultPath });
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/pom.xml b/flink-addons/hbase/pom.xml
deleted file mode 100644
index 4c309f9..0000000
--- a/flink-addons/hbase/pom.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>flink-addons</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>0.6-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <repositories>
- <repository>
- <id>cloudera-releases</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <properties>
- <hbase.version>0.96.0-hadoop2</hbase.version>
- </properties>
-
- <artifactId>hbase</artifactId>
- <name>hbase</name>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>0.94.2-cdh4.2.1</version>
- <exclusions>
- <!-- jruby is used for the hbase shell. -->
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <!-- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- </dependency>
- -->
-
- <!-- hadoop-client is available for yarn and non-yarn, so there is no need
- to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009
- for description of hadoop-clients -->
-
- <reporting>
- <plugins>
- </plugins>
- </reporting>
-
- <build>
- <plugins>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
deleted file mode 100644
index 9029030..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-
-public abstract class GenericTableOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- public static final String JT_ID_KEY = "pact.hbase.jtkey";
-
- public static final String JOB_ID_KEY = "pact.job.id";
-
- private RecordWriter<ImmutableBytesWritable, KeyValue> writer;
-
- private Configuration config;
-
- private org.apache.hadoop.conf.Configuration hadoopConfig;
-
- private TaskAttemptContext context;
-
- private String jtID;
-
- private int jobId;
-
-
- @Override
- public void configure(Configuration parameters) {
- this.config = parameters;
-
- // get the ID parameters
- this.jtID = parameters.getString(JT_ID_KEY, null);
- if (this.jtID == null) {
- throw new RuntimeException("Missing JT_ID entry in hbase config.");
- }
- this.jobId = parameters.getInteger(JOB_ID_KEY, -1);
- if (this.jobId < 0) {
- throw new RuntimeException("Missing or invalid job id in input config.");
- }
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- this.hadoopConfig = getHadoopConfig(this.config);
-
- /**
- * PLASE NOTE:
- * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
- * close the pact-hbase project OR set the maven profile to hadoop_yarn
- *
- * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
- * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
- */
- final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
-
- this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
- final HFileOutputFormat outFormat = new HFileOutputFormat();
- try {
- this.writer = outFormat.getRecordWriter(this.context);
- } catch (InterruptedException iex) {
- throw new IOException("Opening the writer was interrupted.", iex);
- }
- }
-
- @Override
- public void close() throws IOException {
- final RecordWriter<ImmutableBytesWritable, KeyValue> writer = this.writer;
- this.writer = null;
- if (writer != null) {
- try {
- writer.close(this.context);
- } catch (InterruptedException iex) {
- throw new IOException("Closing was interrupted.", iex);
- }
- }
- }
-
- public void collectKeyValue(KeyValue kv) throws IOException {
- try {
- this.writer.write(null, kv);
- } catch (InterruptedException iex) {
- throw new IOException("Write request was interrupted.", iex);
- }
- }
-
- public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
deleted file mode 100644
index ac41927..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.util.Random;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-
-/**
- * A sink for writing to HBase
- */
-public class HBaseDataSink extends GenericDataSink {
-
- private static final int IDENTIFYIER_LEN = 16;
-
- public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) {
- super(f, input, name);
-
- // generate a random unique identifier string
- final Random rnd = new Random();
- final StringBuilder bld = new StringBuilder();
- for (int i = 0; i < IDENTIFYIER_LEN; i++) {
- bld.append((char) (rnd.nextInt(26) + 'a'));
- }
-
- setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString());
- setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt());
- }
-}