You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/26 01:20:40 UTC
[2/8] flink git commit: [FLINK-2906] Remove Record API
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
deleted file mode 100644
index 89baa98..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings({ "serial", "deprecation" })
-public class ReduceWrappingFunctionTest {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testWrappedReduceObject() {
- try {
- AtomicInteger methodCounter = new AtomicInteger();
-
- ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction(methodCounter)).build();
-
- RichFunction reducer = (RichFunction) reduceOp.getUserCodeWrapper().getUserCodeObject();
-
- // test the method invocations
- reducer.close();
- reducer.open(new Configuration());
- assertEquals(2, methodCounter.get());
-
- // prepare the reduce / combine tests
- final List<Record> target = new ArrayList<Record>();
- Collector<Record> collector = new Collector<Record>() {
- @Override
- public void collect(Record record) {
- target.add(record);
- }
- @Override
- public void close() {}
- };
-
- List<Record> source = new ArrayList<Record>();
- source.add(new Record(new IntValue(42), new LongValue(11)));
- source.add(new Record(new IntValue(13), new LongValue(17)));
-
- // test reduce
- ((GroupReduceFunction<Record, Record>) reducer).reduce(source, collector);
- assertEquals(2, target.size());
- assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
- assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
- assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
- assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
- target.clear();
-
- // test combine
- ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
- assertEquals(2, target.size());
- assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
- assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
- assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
- assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
- target.clear();
-
- // test the serialization
- SerializationUtils.clone((java.io.Serializable) reducer);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testWrappedReduceClass() {
- try {
- ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
-
- UserCodeWrapper<GroupReduceFunction<Record, Record>> udf = reduceOp.getUserCodeWrapper();
- UserCodeWrapper<GroupReduceFunction<Record, Record>> copy = SerializationUtils.clone(udf);
- GroupReduceFunction<Record, Record> reducer = copy.getUserCodeObject();
-
- // prepare the reduce / combine tests
- final List<Record> target = new ArrayList<Record>();
- Collector<Record> collector = new Collector<Record>() {
- @Override
- public void collect(Record record) {
- target.add(record);
- }
- @Override
- public void close() {}
- };
-
- List<Record> source = new ArrayList<Record>();
- source.add(new Record(new IntValue(42), new LongValue(11)));
- source.add(new Record(new IntValue(13), new LongValue(17)));
-
- // test reduce
- reducer.reduce(source, collector);
- assertEquals(2, target.size());
- assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
- assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
- assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
- assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
- target.clear();
-
- // test combine
- ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector);
- assertEquals(2, target.size());
- assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class));
- assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class));
- assertEquals(new IntValue(13), target.get(1).getField(0, IntValue.class));
- assertEquals(new LongValue(17), target.get(1).getField(1, LongValue.class));
- target.clear();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testExtractSemantics() {
- try {
- {
- ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
-
- SingleInputSemanticProperties props = reduceOp.getSemanticProperties();
- FieldSet fw2 = props.getForwardingTargetFields(0, 2);
- FieldSet fw4 = props.getForwardingTargetFields(0, 4);
- assertNotNull(fw2);
- assertNotNull(fw4);
- assertEquals(1, fw2.size());
- assertEquals(1, fw4.size());
- assertTrue(fw2.contains(2));
- assertTrue(fw4.contains(4));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCombinable() {
- try {
- {
- ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build();
- assertTrue(reduceOp.isCombinable());
- }
- {
- ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build();
- assertTrue(reduceOp.isCombinable());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Combinable
- @ConstantFields({2, 4})
- public static class TestReduceFunction extends ReduceFunction {
-
- private final AtomicInteger methodCounter;
-
- private TestReduceFunction(AtomicInteger methodCounter) {
- this.methodCounter= methodCounter;
- }
-
- public TestReduceFunction() {
- methodCounter = new AtomicInteger();
- }
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- while (records.hasNext()) {
- out.collect(records.next());
- }
- }
-
- @Override
- public void close() throws Exception {
- methodCounter.incrementAndGet();
- super.close();
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- methodCounter.incrementAndGet();
- super.open(parameters);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
deleted file mode 100644
index 5ccfdd9..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvInputFormatTest {
-
- protected File tempFile;
-
- private final CsvInputFormat format = new CsvInputFormat();
-
- //Static variables for testing the removal of \r\n to \n
- private static final String FIRST_PART = "That is the first part";
-
- private static final String SECOND_PART = "That is the second part";
-
- // --------------------------------------------------------------------------------------------
- @Before
- public void setup() {
- format.setFilePath("file:///some/file/that/will/not/be/read");
- }
-
- @After
- public void setdown() throws Exception {
- if (this.format != null) {
- this.format.close();
- }
- if (this.tempFile != null) {
- this.tempFile.delete();
- }
- }
-
- @Test
- public void testConfigureEmptyConfig() {
- try {
- Configuration config = new Configuration();
-
- // empty configuration, plus no fields on the format itself is not valid
- try {
- format.configure(config);
- fail(); // should give an error
- } catch (IllegalConfigurationException e) {
- ; // okay
- }
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void readWithEmptyFieldInstanceParameters() {
- try {
- final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- format.setFieldDelimiter('|');
- format.setFieldTypes(StringValue.class, StringValue.class, StringValue.class);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals("abc", record.getField(0, StringValue.class).getValue());
- assertEquals("def", record.getField(1, StringValue.class).getValue());
- assertEquals("ghijk", record.getField(2, StringValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals("abc", record.getField(0, StringValue.class).getValue());
- assertEquals("", record.getField(1, StringValue.class).getValue());
- assertEquals("hhg", record.getField(2, StringValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals("", record.getField(0, StringValue.class).getValue());
- assertEquals("", record.getField(1, StringValue.class).getValue());
- assertEquals("", record.getField(2, StringValue.class).getValue());
- }
- catch (Exception ex) {
- ex.printStackTrace();
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void readWithEmptyFieldConfigParameters() {
- try {
- final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
- new CsvInputFormat.ConfigBuilder(null, parameters)
- .field(StringValue.class, 0).field(StringValue.class, 1).field(StringValue.class, 2);
-
- format.setFieldDelimiter("|");
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals("abc", record.getField(0, StringValue.class).getValue());
- assertEquals("def", record.getField(1, StringValue.class).getValue());
- assertEquals("ghijk", record.getField(2, StringValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals("abc", record.getField(0, StringValue.class).getValue());
- assertEquals("", record.getField(1, StringValue.class).getValue());
- assertEquals("hhg", record.getField(2, StringValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals("", record.getField(0, StringValue.class).getValue());
- assertEquals("", record.getField(1, StringValue.class).getValue());
- assertEquals("", record.getField(2, StringValue.class).getValue());
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testReadAll() throws IOException {
- try {
- final String fileContent = "111|222|333|444|555\n666|777|888|999|000|";
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- new CsvInputFormat.ConfigBuilder(null, parameters)
- .fieldDelimiter('|')
- .field(IntValue.class, 0).field(IntValue.class, 1).field(IntValue.class, 2)
- .field(IntValue.class, 3).field(IntValue.class, 4);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals(111, record.getField(0, IntValue.class).getValue());
- assertEquals(222, record.getField(1, IntValue.class).getValue());
- assertEquals(333, record.getField(2, IntValue.class).getValue());
- assertEquals(444, record.getField(3, IntValue.class).getValue());
- assertEquals(555, record.getField(4, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(666, record.getField(0, IntValue.class).getValue());
- assertEquals(777, record.getField(1, IntValue.class).getValue());
- assertEquals(888, record.getField(2, IntValue.class).getValue());
- assertEquals(999, record.getField(3, IntValue.class).getValue());
- assertEquals(000, record.getField(4, IntValue.class).getValue());
-
- assertNull(format.nextRecord(record));
- assertTrue(format.reachedEnd());
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testReadFirstN() throws IOException {
- try {
- final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|";
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- new CsvInputFormat.ConfigBuilder(null, parameters)
- .fieldDelimiter('|')
- .field(IntValue.class, 0).field(IntValue.class, 1);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals(111, record.getField(0, IntValue.class).getValue());
- assertEquals(222, record.getField(1, IntValue.class).getValue());
- boolean notParsed = false;
- try {
- record.getField(2, IntValue.class);
- } catch (IndexOutOfBoundsException ioo) {
- notParsed = true;
- }
- assertTrue(notParsed);
-
- assertNotNull(format.nextRecord(record));
- assertEquals(666, record.getField(0, IntValue.class).getValue());
- assertEquals(777, record.getField(1, IntValue.class).getValue());
- notParsed = false;
- try {
- record.getField(2, IntValue.class);
- } catch (IndexOutOfBoundsException ioo) {
- notParsed = true;
- }
- assertTrue(notParsed);
-
- assertNull(format.nextRecord(record));
- assertTrue(format.reachedEnd());
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
-
- }
-
- @Test
- public void testReadSparse() throws IOException {
- try {
- final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- new CsvInputFormat.ConfigBuilder(null, parameters)
- .fieldDelimiter('|')
- .field(IntValue.class, 0).field(IntValue.class, 3).field(IntValue.class, 7);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals(111, record.getField(0, IntValue.class).getValue());
- assertEquals(444, record.getField(1, IntValue.class).getValue());
- assertEquals(888, record.getField(2, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(000, record.getField(0, IntValue.class).getValue());
- assertEquals(777, record.getField(1, IntValue.class).getValue());
- assertEquals(333, record.getField(2, IntValue.class).getValue());
-
- assertNull(format.nextRecord(record));
- assertTrue(format.reachedEnd());
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testReadSparseShufflePosition() throws IOException {
- try {
- final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- new CsvInputFormat.ConfigBuilder(null, parameters)
- .fieldDelimiter('|')
- .field(IntValue.class, 8).field(IntValue.class, 1).field(IntValue.class, 3);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals(999, record.getField(0, IntValue.class).getValue());
- assertEquals(222, record.getField(1, IntValue.class).getValue());
- assertEquals(444, record.getField(2, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(222, record.getField(0, IntValue.class).getValue());
- assertEquals(999, record.getField(1, IntValue.class).getValue());
- assertEquals(777, record.getField(2, IntValue.class).getValue());
-
- assertNull(format.nextRecord(record));
- assertTrue(format.reachedEnd());
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- private FileInputSplit createTempFile(String content) throws IOException {
- this.tempFile = File.createTempFile("test_contents", "tmp");
- this.tempFile.deleteOnExit();
-
- DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile));
- dos.writeBytes(content);
- dos.close();
-
- return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
- }
-
- @Test
- public void testWindowsLineEndRemoval() {
-
- //Check typical use case -- linux file is correct and it is set up to linuc(\n)
- this.testRemovingTrailingCR("\n", "\n");
-
- //Check typical windows case -- windows file endings and file has windows file endings set up
- this.testRemovingTrailingCR("\r\n", "\r\n");
-
- //Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up
- this.testRemovingTrailingCR("\r\n", "\n");
-
- //Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n)
- //Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard.
- }
-
- private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) {
- File tempFile=null;
-
- String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile;
-
- try {
- // create input file
- tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
- tempFile.deleteOnExit();
- tempFile.setWritable(true);
-
- OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
- wrt.write(fileContent);
- wrt.close();
-
- //Instantiate input format
- CsvInputFormat inputFormat = new CsvInputFormat();
-
- Configuration parameters = new Configuration();
- new CsvInputFormat.ConfigBuilder(null, parameters)
- .field(StringValue.class, 0).filePath(tempFile.toURI().toString());
-
-
- inputFormat.configure(parameters);
-
- inputFormat.setDelimiter(lineBreakerSetup);
-
- FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
- inputFormat.open(splits[0]);
-
- Record record = new Record();
-
- Record result = inputFormat.nextRecord(record);
-
- assertNotNull("Expecting to not return null", result);
-
-
-
- assertEquals(FIRST_PART, result.getField(0, StringValue.class).getValue());
-
- result = inputFormat.nextRecord(record);
-
- assertNotNull("Expecting to not return null", result);
- assertEquals(SECOND_PART, result.getField(0, StringValue.class).getValue());
-
- }
- catch (Throwable t) {
- System.err.println("test failed with exception: " + t.getMessage());
- t.printStackTrace(System.err);
- fail("Test erroneous");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
deleted file mode 100644
index 9eb794f..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvOutputFormatTest {
-
- protected Configuration config;
-
- protected File tempFile;
-
- private final CsvOutputFormat format = new CsvOutputFormat();
-
- // --------------------------------------------------------------------------------------------
-
- @Before
- public void setup() throws IOException {
- this.tempFile = File.createTempFile("test_output", "tmp");
- this.format.setOutputFilePath(new Path(tempFile.toURI()));
- this.format.setWriteMode(WriteMode.OVERWRITE);
- }
-
- @After
- public void setdown() throws Exception {
- if (this.format != null) {
- this.format.close();
- }
- if (this.tempFile != null) {
- this.tempFile.delete();
- }
- }
-
- @Test
- public void testConfigure()
- {
- try {
- Configuration config = new Configuration();
-
- // check missing number of fields
- boolean validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- } catch(IllegalStateException ise) {
- validConfig = false;
- }
- assertFalse(validConfig);
-
- // check missing file parser
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- } catch(IllegalStateException ise) {
- validConfig = false;
- }
- assertFalse(validConfig);
-
- // check valid config
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- }
- assertTrue(validConfig);
-
- // check invalid file parser config
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 3);
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- }
- assertFalse(validConfig);
-
- // check valid config
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, StringValue.class);
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- }
- assertTrue(validConfig);
-
- // check valid config
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- System.out.println(iae.getMessage());
- }
- assertTrue(validConfig);
-
- // check invalid text pos config
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- }
- assertFalse(validConfig);
-
- // check valid text pos config
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 3);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 9);
- validConfig = true;
- try {
- format.configure(config);
- } catch(IllegalArgumentException iae) {
- validConfig = false;
- }
- assertTrue(validConfig);
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testWriteNoRecPosNoLenient()
- {
- try {
- Configuration config = new Configuration();
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
-
- format.configure(config);
-
- try {
- format.open(0, 1);
- } catch (IOException e) {
- fail(e.getMessage());
- }
-
- Record r = new Record(2);
-
- try {
- r.setField(0, new StringValue("Hello World"));
- r.setField(1, new IntValue(42));
- format.writeRecord(r);
-
- r.setField(0, new StringValue("AbCdE"));
- r.setField(1, new IntValue(13));
- format.writeRecord(r);
-
- format.close();
-
- BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-
- assertTrue((dis.readLine()+"\n").equals("Hello World|42\n"));
- assertTrue((dis.readLine()+"\n").equals("AbCdE|13\n"));
-
- dis.close();
-
- } catch (IOException e) {
- fail(e.getMessage());
- }
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testWriteNoRecPosNoLenientFail()
- {
- try {
- Configuration config = new Configuration();
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
-
- format.configure(config);
-
- try {
- format.open(0, 1);
- } catch (IOException e) {
- fail(e.getMessage());
- }
-
- Record r = new Record(2);
-
- boolean success = true;
-
- try {
- r.setField(0, new StringValue("Hello World"));
- r.setField(1, new IntValue(42));
- format.writeRecord(r);
-
- r.setNull(0);
- r.setField(1, new IntValue(13));
- format.writeRecord(r);
-
- format.close();
-
- } catch (IOException e) {
- success = false;
- } catch (RuntimeException re) {
- success = false;
- }
-
- assertFalse(success);
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testWriteNoRecPosLenient()
- {
- try {
- Configuration config = new Configuration();
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, IntValue.class);
- config.setBoolean(CsvOutputFormat.LENIENT_PARSING, true);
-
- format.configure(config);
-
- try {
- format.open(0, 1);
- } catch (IOException e) {
- fail(e.getMessage());
- }
-
- Record r = new Record(2);
-
- try {
- r.setField(0, new StringValue("Hello World"));
- r.setField(1, new IntValue(42));
- format.writeRecord(r);
-
- r.setNull(0);
- r.setField(1, new IntValue(13));
- format.writeRecord(r);
-
- format.close();
-
- BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-
- assertTrue((dis.readLine()+"\n").equals("Hello World|42\n"));
- assertTrue((dis.readLine()+"\n").equals("|13\n"));
-
- dis.close();
-
- } catch (IOException e) {
- fail(e.getMessage());
- }
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testWriteRecPosNoLenient()
- {
- try {
- Configuration config = new Configuration();
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-
- format.configure(config);
-
- try {
- format.open(0, 1);
- } catch (IOException e) {
- fail(e.getMessage());
- }
-
- Record r = new Record(2);
-
- try {
- r.setField(0, new StringValue("Hello World"));
- r.setField(1, new IntValue(42));
- r.setField(2, new StringValue("Hello User"));
- format.writeRecord(r);
-
- r.setField(0, new StringValue("AbCdE"));
- r.setField(1, new IntValue(13));
- r.setField(2, new StringValue("ZyXvW"));
- format.writeRecord(r);
-
- format.close();
-
- BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-
- assertTrue((dis.readLine()+"\n").equals("Hello User|Hello World\n"));
- assertTrue((dis.readLine()+"\n").equals("ZyXvW|AbCdE\n"));
-
- dis.close();
-
- } catch (IOException e) {
- fail(e.getMessage());
- }
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testWriteRecPosNoLenientFail()
- {
- try {
- Configuration config = new Configuration();
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-
- format.configure(config);
-
- try {
- format.open(0, 1);
- } catch (IOException e) {
- fail(e.getMessage());
- }
-
- Record r = new Record(2);
-
- boolean success = true;
-
- try {
- r.setField(0, new StringValue("Hello World"));
- r.setField(1, new IntValue(42));
- r.setField(2, new StringValue("Hello User"));
- format.writeRecord(r);
-
- r = new Record();
-
- r.setField(0, new StringValue("AbCdE"));
- r.setField(1, new IntValue(13));
- format.writeRecord(r);
-
- format.close();
-
- } catch (IOException e) {
- success = false;
- } catch (RuntimeException re) {
- success = false;
- }
-
- assertFalse(success);
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
- @Test
- public void testWriteRecPosLenient()
- {
- try {
- Configuration config = new Configuration();
- config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
- config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
- config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, StringValue.class);
- config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
- config.setBoolean(CsvOutputFormat.LENIENT_PARSING, true);
-
- format.configure(config);
-
- try {
- format.open(0, 1);
- } catch (IOException e) {
- fail(e.getMessage());
- }
-
- Record r = new Record(2);
-
- try {
- r.setField(0, new StringValue("Hello World"));
- r.setField(1, new IntValue(42));
- r.setField(2, new StringValue("Hello User"));
- format.writeRecord(r);
-
- r = new Record();
-
- r.setField(0, new StringValue("AbCdE"));
- r.setField(1, new IntValue(13));
- format.writeRecord(r);
-
- format.close();
-
- BufferedReader dis = new BufferedReader(new FileReader(tempFile));
-
- assertTrue((dis.readLine()+"\n").equals("Hello User|Hello World\n"));
- assertTrue((dis.readLine()+"\n").equals("|AbCdE\n"));
-
- dis.close();
-
- } catch (IOException e) {
- fail(e.getMessage());
- }
- }
- catch (Exception ex) {
- Assert.fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
deleted file mode 100644
index 4aec38e..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExternalProcessFixedLengthInputFormatTest {
-
-private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> format;
-
- private final String neverEndingCommand = "cat /dev/urandom";
- private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000";
- private final String incompleteRecordsCommand = "dd if=/dev/zero bs=7 count=2";
- private final String failingCommand = "ls /I/do/not/exist";
-
- @Before
- public void prepare() {
- format = new MyExternalProcessTestInputFormat();
- }
-
- @Test
- public void testOpen() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-
- boolean processDestroyed = false;
- try {
- format.configure(config);
- format.open(split);
-
- String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"};
-
- byte[] wcOut = new byte[128];
- Process p = Runtime.getRuntime().exec(cmd);
- p.getInputStream().read(wcOut);
- int pCnt = Integer.parseInt(new String(wcOut).trim());
- Assert.assertTrue(pCnt > 0);
-
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
- processDestroyed = true;
- }
- } finally {
- Assert.assertTrue(processDestroyed);
- }
- }
-
- @Test
- public void testCheckExitCode() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, failingCommand);
-
- format.configure(config);
- boolean invalidExitCode = false;
- try {
- format.open(split);
- format.waitForProcessToFinish();
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (InterruptedException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
- invalidExitCode = true;
- }
- }
- Assert.assertTrue(invalidExitCode);
-
- invalidExitCode = false;
- config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2");
- format.configure(config);
- try {
- format.open(split);
- // wait for process to start...
- Thread.sleep(100);
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (InterruptedException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
- invalidExitCode = true;
- }
- }
- Assert.assertTrue(!invalidExitCode);
-
- }
-
- @Test
- public void testUserCodeTermination() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
- config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 100);
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
- Record record = new Record();
-
- boolean userException = false;
- boolean processDestroyed = false;
- try {
- format.configure(config);
- format.open(split);
- while(!format.reachedEnd()) {
- try {
- format.nextRecord(record);
- } catch(RuntimeException re) {
- userException = true;
- break;
- }
- }
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
- processDestroyed = true;
- }
- } finally {
- Assert.assertTrue(userException && processDestroyed);
- }
- }
-
- @Test
- public void testReadStream() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
- Record record = new Record();
-
- int cnt = 0;
- try {
- format.configure(config);
- format.open(split);
- while(!format.reachedEnd()) {
- if (format.nextRecord(record) != null) {
- cnt++;
- }
- }
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- Assert.fail(e.getMessage());
- }
- Assert.assertTrue(cnt == 1000);
- }
-
- @Test
- public void testReadInvalidStream() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.incompleteRecordsCommand);
- Record record = new Record();
-
- boolean incompleteRecordDetected = false;
- @SuppressWarnings("unused")
- int cnt = 0;
- try {
- format.configure(config);
- format.open(split);
- while(!format.reachedEnd()) {
- if (format.nextRecord(record) != null) {
- cnt++;
- }
- }
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().equals("External process produced incomplete record")) {
- incompleteRecordDetected = true;
- } else {
- Assert.fail(e.getMessage());
- }
- }
- Assert.assertTrue(incompleteRecordDetected);
- }
-
- private final class MyExternalProcessTestInputFormat extends ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> {
- private static final long serialVersionUID = 1L;
-
- public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount";
-
- private long cnt = 0;
- private int failCnt;
-
- @Override
- public void configure(Configuration parameters) {
- super.configure(parameters);
- failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
- }
-
- @Override
- public boolean readBytes(Record record, byte[] bytes, int startPos) {
-
- if(cnt == failCnt) {
- throw new RuntimeException("This is a test exception!");
- }
-
- int v1 = 0;
- v1 = v1 | (0xFF & bytes[startPos+0]);
- v1 = (v1 << 8) | (0xFF & bytes[startPos+1]);
- v1 = (v1 << 8) | (0xFF & bytes[startPos+2]);
- v1 = (v1 << 8) | (0xFF & bytes[startPos+3]);
-
- int v2 = 0;
- v2 = v2 | (0xFF & bytes[startPos+4]);
- v2 = (v2 << 8) | (0xFF & bytes[startPos+5]);
- v2 = (v2 << 8) | (0xFF & bytes[startPos+6]);
- v2 = (v2 << 8) | (0xFF & bytes[startPos+7]);
-
- record.setField(0,new IntValue(v1));
- record.setField(1,new IntValue(v2));
-
- cnt++;
-
- return true;
- }
-
- @Override
- public ExternalProcessInputSplit[] createInputSplits(int minNumSplits)
- throws IOException {
- return null;
- }
-
- @Override
- public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
- return new DefaultInputSplitAssigner(splits);
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
deleted file mode 100644
index 6b8cacb..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExternalProcessInputFormatTest {
-
- private ExternalProcessInputFormat<ExternalProcessInputSplit> format;
-
- private final String neverEndingCommand = "cat /dev/urandom";
- private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000";
- private final String failingCommand = "ls /I/do/not/exist";
-
- @Before
- public void prepare() {
- format = new MyExternalProcessTestInputFormat();
- }
-
- @Test
- public void testOpen() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-
- boolean processDestroyed = false;
- try {
- format.configure(config);
- format.open(split);
-
- String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"};
-
- byte[] wcOut = new byte[128];
- Process p = Runtime.getRuntime().exec(cmd);
- p.getInputStream().read(wcOut);
- int pCnt = Integer.parseInt(new String(wcOut).trim());
- Assert.assertTrue(pCnt > 0);
-
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
- processDestroyed = true;
- }
- } finally {
- Assert.assertTrue(processDestroyed);
- }
- }
-
- @Test
- public void testCheckExitCode() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, failingCommand);
-
- format.configure(config);
- boolean invalidExitCode = false;
- try {
- format.open(split);
- format.waitForProcessToFinish();
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (InterruptedException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
- invalidExitCode = true;
- }
- }
- Assert.assertTrue(invalidExitCode);
-
- invalidExitCode = false;
- config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2");
- format.configure(config);
- try {
- format.open(split);
- format.waitForProcessToFinish();
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (InterruptedException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().startsWith("External process did not finish with an allowed exit code:")) {
- invalidExitCode = true;
- }
- }
- Assert.assertTrue(!invalidExitCode);
-
- }
-
- @Test
- public void testUserCodeTermination() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 100);
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
- Record record = new Record();
-
- boolean userException = false;
- boolean processDestroyed = false;
- try {
- format.configure(config);
- format.open(split);
- while(!format.reachedEnd()) {
- try {
- format.nextRecord(record);
- } catch(RuntimeException re) {
- userException = true;
- break;
- }
- }
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- if(e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
- processDestroyed = true;
- }
- } finally {
- Assert.assertTrue(userException && processDestroyed);
- }
- }
-
- @Test
- public void testReadStream() {
-
- if(OperatingSystem.isWindows()) {
- return;
- }
-
- Configuration config = new Configuration();
- ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
- Record record = new Record();
-
- int cnt = 0;
- try {
- format.configure(config);
- format.open(split);
- while(!format.reachedEnd()) {
- if (format.nextRecord(record) != null) {
- cnt++;
- }
- }
- format.close();
- } catch (IOException e) {
- Assert.fail();
- } catch (RuntimeException e) {
- Assert.fail(e.getMessage());
- }
- Assert.assertTrue("Expected read count was 1000, actual read count was "+cnt, cnt == 1000);
- }
-
- private final class MyExternalProcessTestInputFormat extends ExternalProcessInputFormat<ExternalProcessInputSplit> {
- private static final long serialVersionUID = 1L;
-
- public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount";
-
- private byte[] buf = new byte[8];
-
- private long cnt = 0;
- private int failCnt;
- private boolean end;
-
- @Override
- public void configure(Configuration parameters) {
- super.configure(parameters);
- failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
- }
-
- @Override
- public void open(GenericInputSplit split) throws IOException {
- super.open(split);
-
- this.end = false;
- }
-
- @Override
- public ExternalProcessInputSplit[] createInputSplits(int minNumSplits) {
- return null;
- }
-
- @Override
- public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
- return new DefaultInputSplitAssigner(splits);
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return null;
- }
-
- @Override
- public Record nextRecord(Record reuse) throws IOException {
-
- if(cnt > failCnt) {
- throw new RuntimeException("This is a test exception!");
- }
-
- int totalReadCnt = 0;
-
- do {
- int readCnt = super.extProcOutStream.read(buf, totalReadCnt, buf.length-totalReadCnt);
-
- if(readCnt == -1) {
- this.end = true;
- return null;
- } else {
- totalReadCnt += readCnt;
- }
-
- } while(totalReadCnt != 8);
-
- int v1 = 0;
- v1 = v1 | (0xFF & buf[0]);
- v1 = (v1 << 8) | (0xFF & buf[1]);
- v1 = (v1 << 8) | (0xFF & buf[2]);
- v1 = (v1 << 8) | (0xFF & buf[3]);
-
- int v2 = 0;
- v2 = v2 | (0xFF & buf[4]);
- v2 = (v2 << 8) | (0xFF & buf[5]);
- v2 = (v2 << 8) | (0xFF & buf[6]);
- v2 = (v2 << 8) | (0xFF & buf[7]);
-
- reuse.setField(0,new IntValue(v1));
- reuse.setField(1,new IntValue(v2));
-
- this.cnt++;
-
- return reuse;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return this.end;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
deleted file mode 100644
index 3d441e6..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FixedLenghtInputFormatTest {
-
- protected Configuration config;
-
- protected File tempFile;
-
- private final FixedLengthInputFormat format = new MyFixedLengthInputFormat();
-
- // --------------------------------------------------------------------------------------------
-
- @Before
- public void setup() {
- format.setFilePath("file:///some/file/that/will/not/be/read");
- }
-
- @After
- public void setdown() throws Exception {
- if (this.format != null) {
- this.format.close();
- }
- if (this.tempFile != null) {
- this.tempFile.delete();
- }
- }
-
- @Test
- public void testOpen() throws IOException {
- final int[] fileContent = {1,2,3,4,5,6,7,8};
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
- parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-
- format.configure(parameters);
- format.open(split);
- assertEquals(0, format.getSplitStart());
- assertEquals(0, format.getReadBufferSize() % 8);
- format.close();
-
- parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 13);
- format.configure(parameters);
- format.close();
- format.open(split);
- assertEquals(0, format.getReadBufferSize() % 13);
- format.close();
-
- parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 27);
- format.configure(parameters);
- format.close();
- format.open(split);
- assertEquals(0, format.getReadBufferSize() % 27);
- format.close();
-
- }
-
- @Test
- public void testRead() throws IOException {
- final int[] fileContent = {1,2,3,4,5,6,7,8};
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
-
- parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- assertNotNull(format.nextRecord(record));
- assertEquals(1, record.getField(0, IntValue.class).getValue());
- assertEquals(2, record.getField(1, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(3, record.getField(0, IntValue.class).getValue());
- assertEquals(4, record.getField(1, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(5, record.getField(0, IntValue.class).getValue());
- assertEquals(6, record.getField(1, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(7, record.getField(0, IntValue.class).getValue());
- assertEquals(8, record.getField(1, IntValue.class).getValue());
-
- assertNull(format.nextRecord(record));
- assertTrue(format.reachedEnd());
- }
-
-
- @Test
- public void testReadFail() throws IOException {
- final int[] fileContent = {1,2,3,4,5,6,7,8,9};
- final FileInputSplit split = createTempFile(fileContent);
-
- final Configuration parameters = new Configuration();
- parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-
- format.configure(parameters);
- format.open(split);
-
- Record record = new Record();
-
- try {
- assertNotNull(format.nextRecord(record));
- assertEquals(1, record.getField(0, IntValue.class).getValue());
- assertEquals(2, record.getField(1, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(3, record.getField(0, IntValue.class).getValue());
- assertEquals(4, record.getField(1, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(5, record.getField(0, IntValue.class).getValue());
- assertEquals(6, record.getField(1, IntValue.class).getValue());
-
- assertNotNull(format.nextRecord(record));
- assertEquals(7, record.getField(0, IntValue.class).getValue());
- assertEquals(8, record.getField(1, IntValue.class).getValue());
-
- assertNull(format.nextRecord(record));
- } catch(IOException ioe) {
- assertTrue(ioe.getMessage().equals("Unable to read full record"));
- }
- }
-
-
- private FileInputSplit createTempFile(int[] contents) throws IOException {
- this.tempFile = File.createTempFile("test_contents", "tmp");
- this.tempFile.deleteOnExit();
-
- DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile));
-
- for(int i : contents) {
- dos.writeInt(i);
- }
-
- dos.close();
-
- return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
- }
-
-
- private final class MyFixedLengthInputFormat extends FixedLengthInputFormat {
- private static final long serialVersionUID = 1L;
-
- IntValue p1 = new IntValue();
- IntValue p2 = new IntValue();
-
- @Override
- public boolean readBytes(Record target, byte[] buffer, int startPos) {
- int v1 = 0;
- v1 = (v1 | buffer[startPos+0]) << 8;
- v1 = (v1 | buffer[startPos+1]) << 8;
- v1 = (v1 | buffer[startPos+2]) << 8;
- v1 = (v1 | buffer[startPos+3]);
- p1.setValue(v1);
-
- int v2 = 0;
- v2 = (v2 | buffer[startPos+4]) << 8;
- v2 = (v2 | buffer[startPos+5]) << 8;
- v2 = (v2 | buffer[startPos+6]) << 8;
- v2 = (v2 | buffer[startPos+7]);
- p2.setValue(v2);
-
- target.setField(0, p1);
- target.setField(1, p2);
-
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
deleted file mode 100644
index 8ca19cf..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.OutputStreamWriter;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-public class TextInputFormatTest {
- /**
- * The TextInputFormat seems to fail reading more than one record. I guess its
- * an off by one error.
- *
- * The easiest workaround is to setParameter(TextInputFormat.CHARSET_NAME, "ASCII");
- */
- @Test
- public void testPositionBug() {
- final String FIRST = "First line";
- final String SECOND = "Second line";
-
- try {
- // create input file
- File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
- tempFile.deleteOnExit();
- tempFile.setWritable(true);
-
- FileWriter writer = new FileWriter(tempFile);
- writer.append(FIRST).append('\n');
- writer.append(SECOND).append('\n');
- writer.close();
-
- TextInputFormat inputFormat = new TextInputFormat();
- inputFormat.setFilePath(tempFile.toURI().toString());
-
- Configuration parameters = new Configuration();
- inputFormat.configure(parameters);
-
- FileInputSplit[] splits = inputFormat.createInputSplits(1);
- assertTrue("expected at least one input split", splits.length >= 1);
-
- inputFormat.open(splits[0]);
-
- Record r = new Record();
- assertNotNull("Expecting first record here", inputFormat.nextRecord(r));
- assertEquals(FIRST, r.getField(0, StringValue.class).getValue());
-
- assertNotNull("Expecting second record here",inputFormat.nextRecord(r ));
- assertEquals(SECOND, r.getField(0, StringValue.class).getValue());
-
- assertNull("The input file is over", inputFormat.nextRecord(r));
- }
- catch (Throwable t) {
- System.err.println("test failed with exception: " + t.getMessage());
- t.printStackTrace(System.err);
- fail("Test erroneous");
- }
- }
-
-
- /**
- * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed
- */
- @Test
- public void testRemovingTrailingCR() {
-
- testRemovingTrailingCR("\n","\n");
- testRemovingTrailingCR("\r\n","\n");
-
- testRemovingTrailingCR("|","|");
- testRemovingTrailingCR("|","\n");
-
- }
-
- private void testRemovingTrailingCR(String lineBreaker,String delimiter) {
- File tempFile;
-
- String FIRST = "First line";
- String SECOND = "Second line";
- String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker;
-
- try {
- // create input file
- tempFile = File.createTempFile("TextInputFormatTest", "tmp");
- tempFile.deleteOnExit();
- tempFile.setWritable(true);
-
- OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
- wrt.write(CONTENT);
- wrt.close();
-
- TextInputFormat inputFormat = new TextInputFormat();
- inputFormat.setFilePath(tempFile.toURI().toString());
-
- Configuration parameters = new Configuration();
- inputFormat.configure(parameters);
-
- inputFormat.setDelimiter(delimiter);
-
- FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
- inputFormat.open(splits[0]);
-
- Record r = new Record();
- if ( (delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) )
- || (lineBreaker.equals(delimiter)) ){
-
- assertNotNull("Expecting first record here", inputFormat.nextRecord(r));
- assertEquals(FIRST, r.getField(0, StringValue.class).getValue());
-
- assertNotNull("Expecting second record here",inputFormat.nextRecord(r ));
- assertEquals(SECOND, r.getField(0, StringValue.class).getValue());
-
- assertNull("The input file is over", inputFormat.nextRecord(r));
- } else {
- assertNotNull("Expecting first record here", inputFormat.nextRecord(r));
- assertEquals(CONTENT, r.getField(0, StringValue.class).getValue());
- }
-
-
- }
- catch (Throwable t) {
- System.err.println("test failed with exception: " + t.getMessage());
- t.printStackTrace(System.err);
- fail("Test erroneous");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 5ff9eaf..78d61d1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -169,8 +169,7 @@ public abstract class CostEstimator {
switch (n.getDriverStrategy()) {
case NONE:
case UNARY_NO_OP:
- case BINARY_NO_OP:
- case COLLECTOR_MAP:
+ case BINARY_NO_OP:
case MAP:
case MAP_PARTITION:
case FLAT_MAP:
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
deleted file mode 100644
index 9c1bcd3..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.CollectorMapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>Map</i> operator node.
- */
-public class CollectorMapNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
-
- public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
- super(operator);
-
- this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor());
- }
-
- @Override
- public String getOperatorName() {
- return "Map";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- /**
- * Computes the estimates for the Map operator. Map takes one value and transforms it into another value.
- * The cardinality consequently stays the same.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
deleted file mode 100644
index bcd4d73..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CollectorMapDescriptor extends OperatorDescriptorSingle {
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.COLLECTOR_MAP;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setAnyDistribution();
- return Collections.singletonList(rgp);
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- return Collections.singletonList(new RequestedLocalProperties());
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
- gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
- {
- gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
- }
- gProps.clearUniqueFieldCombinations();
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- return lProps.clearUniqueFieldSets();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index aaabac5..fc5eb21 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -418,7 +418,6 @@ public class PlanJSONDumpGenerator {
locString = "No-Op";
break;
- case COLLECTOR_MAP:
case MAP:
locString = "Map";
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
index d5ddf4d..1125c29 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -57,7 +57,6 @@ public class JsonMapper {
case UNARY_NO_OP:
return "No-Op";
- case COLLECTOR_MAP:
case MAP:
return "Map";
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index bcdee14..3f3eae1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -47,7 +47,6 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
import org.apache.flink.optimizer.dag.CoGroupNode;
import org.apache.flink.optimizer.dag.CoGroupRawNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
import org.apache.flink.optimizer.dag.CrossNode;
import org.apache.flink.optimizer.dag.DagConnection;
import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -144,9 +143,6 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
else if (c instanceof MapPartitionOperatorBase) {
n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
}
- else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
- n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
- }
else if (c instanceof FlatMapOperatorBase) {
n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
deleted file mode 100644
index 60bc798..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Map task which is executed by a Task Manager. The task has a single
- * input and one or multiple outputs. It is provided with a MapFunction
- * implementation.
- * <p>
- * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method
- * of the MapFunction.
- *
- * @see GenericCollectorMap
- *
- * @param <IT> The mapper's input data type.
- * @param <OT> The mapper's output data type.
- */
-@SuppressWarnings("deprecation")
-public class CollectorMapDriver<IT, OT> implements Driver<GenericCollectorMap<IT, OT>, OT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(CollectorMapDriver.class);
-
-
- private TaskContext<GenericCollectorMap<IT, OT>, OT> taskContext;
-
- private volatile boolean running;
-
- private boolean objectReuseEnabled = false;
-
- @Override
- public void setup(TaskContext<GenericCollectorMap<IT, OT>, OT> context) {
- this.taskContext = context;
- this.running = true;
- }
-
- @Override
- public int getNumberOfInputs() {
- return 1;
- }
-
- @Override
- public Class<GenericCollectorMap<IT, OT>> getStubType() {
- @SuppressWarnings("unchecked")
- final Class<GenericCollectorMap<IT, OT>> clazz = (Class<GenericCollectorMap<IT, OT>>) (Class<?>) GenericCollectorMap.class;
- return clazz;
- }
-
- @Override
- public int getNumberOfDriverComparators() {
- return 0;
- }
-
- @Override
- public void prepare() {
- ExecutionConfig executionConfig = taskContext.getExecutionConfig();
- this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CollectorMapDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
- }
- }
-
- @Override
- public void run() throws Exception {
- // cache references on the stack
- final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
- final GenericCollectorMap<IT, OT> stub = this.taskContext.getStub();
- final Collector<OT> output = this.taskContext.getOutputCollector();
-
- if (objectReuseEnabled) {
- IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
-
-
- while (this.running && ((record = input.next(record)) != null)) {
- stub.map(record, output);
- }
- } else {
- IT record;
- while (this.running && ((record = input.next()) != null)) {
- stub.map(record, output);
- }
- }
- }
-
- @Override
- public void cleanup() {
- // mappers need no cleanup, since no strategies are used.
- }
-
- @Override
- public void cancel() {
- this.running = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index b069f12..12da126 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -23,7 +23,6 @@ import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver;
import org.apache.flink.runtime.operators.chaining.ChainedMapDriver;
@@ -40,8 +39,6 @@ public enum DriverStrategy {
// a binary no-op operator. non implementation available
BINARY_NO_OP(null, null, PIPELINED, PIPELINED, 0),
- // the old mapper
- COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, 0),
// the proper mapper
MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0),
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
deleted file mode 100644
index 8900ed7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.chaining;
-
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.BatchTask;
-
-@SuppressWarnings("deprecation")
-public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
-
- private GenericCollectorMap<IT, OT> mapper;
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void setup(AbstractInvokable parent) {
- @SuppressWarnings("unchecked")
- final GenericCollectorMap<IT, OT> mapper =
- BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class);
- this.mapper = mapper;
- mapper.setRuntimeContext(getUdfRuntimeContext());
- }
-
- @Override
- public void openTask() throws Exception {
- Configuration stubConfig = this.config.getStubParameters();
- BatchTask.openUserCode(this.mapper, stubConfig);
- }
-
- @Override
- public void closeTask() throws Exception {
- BatchTask.closeUserCode(this.mapper);
- }
-
- @Override
- public void cancelTask() {
- try {
- this.mapper.close();
- } catch (Throwable t) {
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public RichFunction getStub() {
- return this.mapper;
- }
-
- public String getTaskName() {
- return this.taskName;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void collect(IT record) {
- try {
- this.mapper.map(record, this.outputCollector);
- } catch (Exception ex) {
- throw new ExceptionInChainedStubException(this.taskName, ex);
- }
- }
-
- @Override
- public void close() {
- this.outputCollector.close();
- }
-}