You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:41 UTC
[51/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
[FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.
This closes #2897.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de4fe3b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de4fe3b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de4fe3b7
Branch: refs/heads/master
Commit: de4fe3b7392948807753d65d13f3da968e6c7de0
Parents: cc006ff
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 29 13:57:30 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 2 14:28:35 2016 +0100
----------------------------------------------------------------------
.gitignore | 2 +-
flink-batch-connectors/flink-avro/pom.xml | 216 --
.../apache/flink/api/avro/DataInputDecoder.java | 213 --
.../flink/api/avro/DataOutputEncoder.java | 183 --
.../api/avro/FSDataInputStreamWrapper.java | 68 -
.../flink/api/java/io/AvroInputFormat.java | 207 --
.../flink/api/java/io/AvroOutputFormat.java | 189 --
.../src/test/assembly/test-assembly.xml | 36 -
.../api/avro/AvroExternalJarProgramITCase.java | 80 -
.../flink/api/avro/AvroOutputFormatITCase.java | 176 --
.../flink/api/avro/EncoderDecoderTest.java | 528 -----
.../avro/testjar/AvroExternalJarProgram.java | 219 --
.../apache/flink/api/io/avro/AvroPojoTest.java | 255 ---
.../api/io/avro/AvroRecordInputFormatTest.java | 458 ----
.../io/avro/AvroSplittableInputFormatTest.java | 326 ---
.../api/io/avro/example/AvroTypeExample.java | 108 -
.../apache/flink/api/io/avro/example/User.java | 269 ---
.../io/AvroInputFormatTypeExtractionTest.java | 81 -
.../flink/api/java/io/AvroOutputFormatTest.java | 154 --
.../src/test/resources/avro/user.avsc | 35 -
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/logback-test.xml | 29 -
.../flink-avro/src/test/resources/testdata.avro | Bin 4572 -> 0 bytes
.../flink-hadoop-compatibility/pom.xml | 182 --
.../api/java/typeutils/WritableTypeInfo.java | 154 --
.../typeutils/runtime/WritableComparator.java | 188 --
.../typeutils/runtime/WritableSerializer.java | 152 --
.../flink/hadoopcompatibility/HadoopInputs.java | 118 --
.../flink/hadoopcompatibility/HadoopUtils.java | 52 -
.../mapred/HadoopMapFunction.java | 133 --
.../mapred/HadoopReduceCombineFunction.java | 168 --
.../mapred/HadoopReduceFunction.java | 142 --
.../mapred/wrapper/HadoopOutputCollector.java | 59 -
.../wrapper/HadoopTupleUnwrappingIterator.java | 94 -
.../scala/HadoopInputs.scala | 143 --
.../java/typeutils/WritableExtractionTest.java | 206 --
.../java/typeutils/WritableInfoParserTest.java | 84 -
.../java/typeutils/WritableTypeInfoTest.java | 72 -
.../typeutils/runtime/StringArrayWritable.java | 83 -
.../runtime/WritableComparatorTest.java | 53 -
.../runtime/WritableComparatorUUIDTest.java | 46 -
.../api/java/typeutils/runtime/WritableID.java | 78 -
.../runtime/WritableSerializerTest.java | 50 -
.../runtime/WritableSerializerUUIDTest.java | 50 -
.../hadoopcompatibility/HadoopUtilsTest.java | 34 -
.../mapred/HadoopMapFunctionITCase.java | 182 --
.../mapred/HadoopMapredITCase.java | 47 -
.../HadoopReduceCombineFunctionITCase.java | 265 ---
.../mapred/HadoopReduceFunctionITCase.java | 213 --
.../mapred/HadoopTestData.java | 62 -
.../example/HadoopMapredCompatWordCount.java | 133 --
.../HadoopTupleUnwrappingIteratorTest.java | 139 --
.../mapreduce/HadoopInputOutputITCase.java | 47 -
.../mapreduce/example/WordCount.java | 119 --
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/logback-test.xml | 29 -
flink-batch-connectors/flink-hbase/pom.xml | 264 ---
.../flink/addons/hbase/TableInputFormat.java | 289 ---
.../flink/addons/hbase/TableInputSplit.java | 89 -
.../hbase/HBaseTestingClusterAutostarter.java | 238 ---
.../addons/hbase/TableInputFormatITCase.java | 120 --
.../hbase/example/HBaseFlinkTestConstants.java | 28 -
.../addons/hbase/example/HBaseReadExample.java | 92 -
.../addons/hbase/example/HBaseWriteExample.java | 202 --
.../hbase/example/HBaseWriteStreamExample.java | 113 -
.../src/test/resources/log4j-test.properties | 23 -
flink-batch-connectors/flink-hcatalog/pom.xml | 182 --
.../flink/hcatalog/HCatInputFormatBase.java | 410 ----
.../flink/hcatalog/java/HCatInputFormat.java | 160 --
.../flink/hcatalog/scala/HCatInputFormat.scala | 229 --
flink-batch-connectors/flink-jdbc/pom.xml | 66 -
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 404 ----
.../api/java/io/jdbc/JDBCOutputFormat.java | 315 ---
.../split/GenericParameterValuesProvider.java | 44 -
.../split/NumericBetweenParametersProvider.java | 66 -
.../io/jdbc/split/ParameterValuesProvider.java | 35 -
.../flink/api/java/io/jdbc/JDBCFullTest.java | 101 -
.../api/java/io/jdbc/JDBCInputFormatTest.java | 247 ---
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 169 --
.../flink/api/java/io/jdbc/JDBCTestBase.java | 183 --
.../src/test/resources/log4j-test.properties | 19 -
.../src/test/resources/logback-test.xml | 29 -
flink-batch-connectors/pom.xml | 45 -
flink-connectors/flink-avro/pom.xml | 216 ++
.../apache/flink/api/avro/DataInputDecoder.java | 213 ++
.../flink/api/avro/DataOutputEncoder.java | 183 ++
.../api/avro/FSDataInputStreamWrapper.java | 68 +
.../flink/api/java/io/AvroInputFormat.java | 207 ++
.../flink/api/java/io/AvroOutputFormat.java | 189 ++
.../src/test/assembly/test-assembly.xml | 36 +
.../api/avro/AvroExternalJarProgramITCase.java | 80 +
.../flink/api/avro/AvroOutputFormatITCase.java | 176 ++
.../flink/api/avro/EncoderDecoderTest.java | 528 +++++
.../avro/testjar/AvroExternalJarProgram.java | 219 ++
.../apache/flink/api/io/avro/AvroPojoTest.java | 255 +++
.../api/io/avro/AvroRecordInputFormatTest.java | 458 ++++
.../io/avro/AvroSplittableInputFormatTest.java | 326 +++
.../api/io/avro/example/AvroTypeExample.java | 108 +
.../apache/flink/api/io/avro/example/User.java | 269 +++
.../io/AvroInputFormatTypeExtractionTest.java | 81 +
.../flink/api/java/io/AvroOutputFormatTest.java | 154 ++
.../src/test/resources/avro/user.avsc | 35 +
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/logback-test.xml | 29 +
.../flink-avro/src/test/resources/testdata.avro | Bin 0 -> 4572 bytes
.../flink-connector-cassandra/pom.xml | 179 ++
.../cassandra/CassandraInputFormat.java | 131 ++
.../cassandra/CassandraOutputFormat.java | 125 ++
.../cassandra/CassandraCommitter.java | 151 ++
.../connectors/cassandra/CassandraPojoSink.java | 67 +
.../connectors/cassandra/CassandraSink.java | 329 +++
.../connectors/cassandra/CassandraSinkBase.java | 98 +
.../cassandra/CassandraTupleSink.java | 59 +
.../cassandra/CassandraTupleWriteAheadSink.java | 159 ++
.../connectors/cassandra/ClusterBuilder.java | 43 +
.../cassandra/example/BatchExample.java | 77 +
.../cassandra/CassandraConnectorITCase.java | 440 ++++
.../CassandraTupleWriteAheadSinkTest.java | 127 ++
.../streaming/connectors/cassandra/Pojo.java | 65 +
.../example/CassandraPojoSinkExample.java | 62 +
.../example/CassandraTupleSinkExample.java | 62 +
.../CassandraTupleWriteAheadSinkExample.java | 96 +
.../connectors/cassandra/example/Message.java | 56 +
.../src/test/resources/cassandra.yaml | 43 +
.../src/test/resources/log4j-test.properties | 29 +
.../flink-connector-elasticsearch/pom.xml | 90 +
.../elasticsearch/ElasticsearchSink.java | 315 +++
.../elasticsearch/IndexRequestBuilder.java | 66 +
.../elasticsearch/ElasticsearchSinkITCase.java | 205 ++
.../examples/ElasticsearchExample.java | 80 +
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-elasticsearch2/pom.xml | 83 +
.../elasticsearch2/BulkProcessorIndexer.java | 35 +
.../elasticsearch2/ElasticsearchSink.java | 257 +++
.../ElasticsearchSinkFunction.java | 60 +
.../elasticsearch2/RequestIndexer.java | 25 +
.../elasticsearch2/ElasticsearchSinkITCase.java | 233 ++
.../examples/ElasticsearchExample.java | 90 +
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-filesystem/pom.xml | 163 ++
.../connectors/fs/AvroKeyValueSinkWriter.java | 309 +++
.../flink/streaming/connectors/fs/Bucketer.java | 55 +
.../flink/streaming/connectors/fs/Clock.java | 33 +
.../connectors/fs/DateTimeBucketer.java | 126 ++
.../connectors/fs/NonRollingBucketer.java | 47 +
.../streaming/connectors/fs/RollingSink.java | 916 ++++++++
.../connectors/fs/SequenceFileWriter.java | 151 ++
.../connectors/fs/StreamWriterBase.java | 152 ++
.../streaming/connectors/fs/StringWriter.java | 86 +
.../streaming/connectors/fs/SystemClock.java | 29 +
.../flink/streaming/connectors/fs/Writer.java | 73 +
.../fs/bucketing/BasePathBucketer.java | 39 +
.../connectors/fs/bucketing/Bucketer.java | 47 +
.../connectors/fs/bucketing/BucketingSink.java | 1082 ++++++++++
.../fs/bucketing/DateTimeBucketer.java | 102 +
.../src/main/resources/log4j.properties | 27 +
.../fs/RollingSinkFaultToleranceITCase.java | 300 +++
.../connectors/fs/RollingSinkITCase.java | 991 +++++++++
.../connectors/fs/RollingSinkSecuredITCase.java | 252 +++
.../BucketingSinkFaultToleranceITCase.java | 297 +++
.../fs/bucketing/BucketingSinkTest.java | 867 ++++++++
.../src/test/resources/log4j-test.properties | 29 +
.../src/test/resources/logback-test.xml | 30 +
flink-connectors/flink-connector-flume/pom.xml | 175 ++
.../streaming/connectors/flume/FlumeSink.java | 141 ++
.../flink-connector-kafka-0.10/pom.xml | 205 ++
.../connectors/kafka/FlinkKafkaConsumer010.java | 153 ++
.../connectors/kafka/FlinkKafkaProducer010.java | 398 ++++
.../kafka/Kafka010JsonTableSource.java | 71 +
.../connectors/kafka/Kafka010TableSource.java | 75 +
.../kafka/internal/Kafka010Fetcher.java | 104 +
.../internal/KafkaConsumerCallBridge010.java | 40 +
.../src/main/resources/log4j.properties | 29 +
.../connectors/kafka/Kafka010FetcherTest.java | 484 +++++
.../connectors/kafka/Kafka010ITCase.java | 313 +++
.../kafka/Kafka010ProducerITCase.java | 33 +
.../kafka/KafkaTestEnvironmentImpl.java | 420 ++++
.../src/test/resources/log4j-test.properties | 30 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-kafka-0.8/pom.xml | 219 ++
.../connectors/kafka/FlinkKafkaConsumer08.java | 398 ++++
.../connectors/kafka/FlinkKafkaConsumer081.java | 39 +
.../connectors/kafka/FlinkKafkaConsumer082.java | 39 +
.../connectors/kafka/FlinkKafkaProducer.java | 64 +
.../connectors/kafka/FlinkKafkaProducer08.java | 145 ++
.../connectors/kafka/Kafka08JsonTableSink.java | 52 +
.../kafka/Kafka08JsonTableSource.java | 71 +
.../connectors/kafka/Kafka08TableSource.java | 75 +
.../kafka/internals/ClosableBlockingQueue.java | 507 +++++
.../kafka/internals/Kafka08Fetcher.java | 481 +++++
.../kafka/internals/KillerWatchDog.java | 62 +
.../kafka/internals/PartitionInfoFetcher.java | 66 +
.../internals/PeriodicOffsetCommitter.java | 85 +
.../kafka/internals/SimpleConsumerThread.java | 504 +++++
.../kafka/internals/ZookeeperOffsetHandler.java | 164 ++
.../connectors/kafka/Kafka08ITCase.java | 248 +++
.../kafka/Kafka08JsonTableSinkTest.java | 48 +
.../kafka/Kafka08JsonTableSourceTest.java | 45 +
.../connectors/kafka/Kafka08ProducerITCase.java | 32 +
.../connectors/kafka/KafkaConsumer08Test.java | 139 ++
.../connectors/kafka/KafkaLocalSystemTime.java | 48 +
.../connectors/kafka/KafkaProducerTest.java | 123 ++
.../kafka/KafkaShortRetention08ITCase.java | 34 +
.../kafka/KafkaTestEnvironmentImpl.java | 401 ++++
.../internals/ClosableBlockingQueueTest.java | 603 ++++++
.../src/test/resources/log4j-test.properties | 30 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-kafka-0.9/pom.xml | 212 ++
.../connectors/kafka/FlinkKafkaConsumer09.java | 269 +++
.../connectors/kafka/FlinkKafkaProducer09.java | 137 ++
.../connectors/kafka/Kafka09JsonTableSink.java | 50 +
.../kafka/Kafka09JsonTableSource.java | 71 +
.../connectors/kafka/Kafka09TableSource.java | 75 +
.../connectors/kafka/internal/Handover.java | 214 ++
.../kafka/internal/Kafka09Fetcher.java | 241 +++
.../kafka/internal/KafkaConsumerCallBridge.java | 41 +
.../kafka/internal/KafkaConsumerThread.java | 332 +++
.../src/main/resources/log4j.properties | 29 +
.../connectors/kafka/Kafka09FetcherTest.java | 482 +++++
.../connectors/kafka/Kafka09ITCase.java | 129 ++
.../kafka/Kafka09JsonTableSinkTest.java | 48 +
.../kafka/Kafka09JsonTableSourceTest.java | 45 +
.../connectors/kafka/Kafka09ProducerITCase.java | 32 +
.../kafka/Kafka09SecuredRunITCase.java | 62 +
.../connectors/kafka/KafkaProducerTest.java | 126 ++
.../kafka/KafkaTestEnvironmentImpl.java | 439 ++++
.../connectors/kafka/internal/HandoverTest.java | 387 ++++
.../src/test/resources/log4j-test.properties | 32 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-kafka-base/pom.xml | 212 ++
.../kafka/FlinkKafkaConsumerBase.java | 552 +++++
.../kafka/FlinkKafkaProducerBase.java | 386 ++++
.../connectors/kafka/KafkaJsonTableSink.java | 47 +
.../connectors/kafka/KafkaJsonTableSource.java | 97 +
.../connectors/kafka/KafkaTableSink.java | 127 ++
.../connectors/kafka/KafkaTableSource.java | 155 ++
.../kafka/internals/AbstractFetcher.java | 552 +++++
.../kafka/internals/ExceptionProxy.java | 125 ++
.../kafka/internals/KafkaTopicPartition.java | 120 ++
.../internals/KafkaTopicPartitionLeader.java | 98 +
.../internals/KafkaTopicPartitionState.java | 118 ++
...picPartitionStateWithPeriodicWatermarks.java | 71 +
...cPartitionStateWithPunctuatedWatermarks.java | 84 +
.../connectors/kafka/internals/TypeUtil.java | 38 +
.../internals/metrics/KafkaMetricWrapper.java | 37 +
.../kafka/partitioner/FixedPartitioner.java | 76 +
.../kafka/partitioner/KafkaPartitioner.java | 41 +
.../JSONDeserializationSchema.java | 46 +
.../JSONKeyValueDeserializationSchema.java | 72 +
.../JsonRowDeserializationSchema.java | 135 ++
.../JsonRowSerializationSchema.java | 70 +
.../KeyedDeserializationSchema.java | 52 +
.../KeyedDeserializationSchemaWrapper.java | 51 +
.../serialization/KeyedSerializationSchema.java | 55 +
.../KeyedSerializationSchemaWrapper.java | 48 +
...eInformationKeyValueSerializationSchema.java | 196 ++
.../kafka/FlinkKafkaConsumerBaseTest.java | 416 ++++
.../kafka/FlinkKafkaProducerBaseTest.java | 288 +++
.../kafka/JSONDeserializationSchemaTest.java | 41 +
.../JSONKeyValueDeserializationSchemaTest.java | 68 +
.../kafka/JsonRowDeserializationSchemaTest.java | 124 ++
.../kafka/JsonRowSerializationSchemaTest.java | 98 +
.../KafkaConsumerPartitionAssignmentTest.java | 269 +++
.../connectors/kafka/KafkaConsumerTestBase.java | 2006 ++++++++++++++++++
.../connectors/kafka/KafkaProducerTestBase.java | 193 ++
.../kafka/KafkaShortRetentionTestBase.java | 291 +++
.../kafka/KafkaTableSinkTestBase.java | 106 +
.../kafka/KafkaTableSourceTestBase.java | 77 +
.../connectors/kafka/KafkaTestBase.java | 203 ++
.../connectors/kafka/KafkaTestEnvironment.java | 112 +
.../connectors/kafka/TestFixedPartitioner.java | 104 +
.../AbstractFetcherTimestampsTest.java | 320 +++
.../internals/KafkaTopicPartitionTest.java | 57 +
.../kafka/testutils/DataGenerators.java | 227 ++
.../kafka/testutils/FailingIdentityMapper.java | 115 +
.../testutils/FakeStandardProducerConfig.java | 34 +
.../testutils/JobManagerCommunicationUtils.java | 120 ++
.../testutils/PartitionValidatingMapper.java | 53 +
.../kafka/testutils/ThrottledMapper.java | 44 +
.../kafka/testutils/Tuple2Partitioner.java | 48 +
.../testutils/ValidatingExactlyOnceSink.java | 82 +
.../testutils/ZooKeeperStringSerializer.java | 51 +
.../src/test/resources/log4j-test.properties | 29 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-kinesis/pom.xml | 164 ++
.../kinesis/FlinkKinesisConsumer.java | 304 +++
.../kinesis/FlinkKinesisProducer.java | 292 +++
.../connectors/kinesis/KinesisPartitioner.java | 49 +
.../kinesis/config/AWSConfigConstants.java | 70 +
.../kinesis/config/ConsumerConfigConstants.java | 138 ++
.../kinesis/config/ProducerConfigConstants.java | 33 +
.../kinesis/examples/ConsumeFromKinesis.java | 54 +
.../kinesis/examples/ProduceIntoKinesis.java | 77 +
.../kinesis/internals/KinesisDataFetcher.java | 679 ++++++
.../kinesis/internals/ShardConsumer.java | 287 +++
.../kinesis/model/KinesisStreamShard.java | 133 ++
.../kinesis/model/KinesisStreamShardState.java | 71 +
.../kinesis/model/SentinelSequenceNumber.java | 51 +
.../kinesis/model/SequenceNumber.java | 104 +
.../kinesis/proxy/GetShardListResult.java | 75 +
.../connectors/kinesis/proxy/KinesisProxy.java | 338 +++
.../kinesis/proxy/KinesisProxyInterface.java | 69 +
.../KinesisDeserializationSchema.java | 57 +
.../KinesisDeserializationSchemaWrapper.java | 57 +
.../KinesisSerializationSchema.java | 45 +
.../connectors/kinesis/util/AWSUtil.java | 130 ++
.../kinesis/util/KinesisConfigUtil.java | 218 ++
.../src/main/resources/log4j.properties | 27 +
.../kinesis/FlinkKinesisConsumerTest.java | 472 +++++
.../internals/KinesisDataFetcherTest.java | 510 +++++
.../kinesis/internals/ShardConsumerTest.java | 122 ++
.../manualtests/ManualConsumerProducerTest.java | 121 ++
.../manualtests/ManualExactlyOnceTest.java | 147 ++
...nualExactlyOnceWithStreamReshardingTest.java | 247 +++
.../kinesis/manualtests/ManualProducerTest.java | 91 +
.../ExactlyOnceValidatingConsumerThread.java | 155 ++
.../testutils/FakeKinesisBehavioursFactory.java | 262 +++
.../KinesisEventsGeneratorProducerThread.java | 118 ++
.../testutils/KinesisShardIdGenerator.java | 25 +
.../testutils/TestableFlinkKinesisConsumer.java | 60 +
.../testutils/TestableKinesisDataFetcher.java | 122 ++
flink-connectors/flink-connector-nifi/pom.xml | 89 +
.../connectors/nifi/NiFiDataPacket.java | 39 +
.../connectors/nifi/NiFiDataPacketBuilder.java | 34 +
.../streaming/connectors/nifi/NiFiSink.java | 74 +
.../streaming/connectors/nifi/NiFiSource.java | 155 ++
.../connectors/nifi/StandardNiFiDataPacket.java | 46 +
.../nifi/examples/NiFiSinkTopologyExample.java | 55 +
.../examples/NiFiSourceTopologyExample.java | 58 +
.../src/test/resources/NiFi_Flink.xml | 16 +
.../flink-connector-rabbitmq/pom.xml | 60 +
.../streaming/connectors/rabbitmq/RMQSink.java | 142 ++
.../connectors/rabbitmq/RMQSource.java | 243 +++
.../rabbitmq/common/RMQConnectionConfig.java | 448 ++++
.../connectors/rabbitmq/RMQSourceTest.java | 419 ++++
.../common/RMQConnectionConfigTest.java | 69 +
.../connectors/rabbitmq/common/RMQSinkTest.java | 125 ++
flink-connectors/flink-connector-redis/pom.xml | 79 +
.../streaming/connectors/redis/RedisSink.java | 188 ++
.../common/config/FlinkJedisClusterConfig.java | 187 ++
.../common/config/FlinkJedisConfigBase.java | 90 +
.../common/config/FlinkJedisPoolConfig.java | 224 ++
.../common/config/FlinkJedisSentinelConfig.java | 259 +++
.../common/container/RedisClusterContainer.java | 171 ++
.../container/RedisCommandsContainer.java | 115 +
.../RedisCommandsContainerBuilder.java | 116 +
.../redis/common/container/RedisContainer.java | 252 +++
.../redis/common/mapper/RedisCommand.java | 86 +
.../common/mapper/RedisCommandDescription.java | 94 +
.../redis/common/mapper/RedisDataType.java | 66 +
.../redis/common/mapper/RedisMapper.java | 66 +
.../connectors/redis/RedisITCaseBase.java | 45 +
.../redis/RedisSentinelClusterTest.java | 100 +
.../connectors/redis/RedisSinkITCase.java | 233 ++
.../redis/RedisSinkPublishITCase.java | 137 ++
.../connectors/redis/RedisSinkTest.java | 144 ++
.../common/config/FlinkJedisConfigBaseTest.java | 50 +
.../common/config/JedisClusterConfigTest.java | 49 +
.../common/config/JedisPoolConfigTest.java | 29 +
.../common/config/JedisSentinelConfigTest.java | 49 +
.../mapper/RedisDataTypeDescriptionTest.java | 41 +
.../flink-connector-twitter/pom.xml | 96 +
.../connectors/twitter/TwitterSource.java | 217 ++
.../flink-hadoop-compatibility/pom.xml | 182 ++
.../api/java/typeutils/WritableTypeInfo.java | 154 ++
.../typeutils/runtime/WritableComparator.java | 188 ++
.../typeutils/runtime/WritableSerializer.java | 152 ++
.../flink/hadoopcompatibility/HadoopInputs.java | 118 ++
.../flink/hadoopcompatibility/HadoopUtils.java | 52 +
.../mapred/HadoopMapFunction.java | 133 ++
.../mapred/HadoopReduceCombineFunction.java | 168 ++
.../mapred/HadoopReduceFunction.java | 142 ++
.../mapred/wrapper/HadoopOutputCollector.java | 59 +
.../wrapper/HadoopTupleUnwrappingIterator.java | 94 +
.../scala/HadoopInputs.scala | 143 ++
.../java/typeutils/WritableExtractionTest.java | 206 ++
.../java/typeutils/WritableInfoParserTest.java | 84 +
.../java/typeutils/WritableTypeInfoTest.java | 72 +
.../typeutils/runtime/StringArrayWritable.java | 83 +
.../runtime/WritableComparatorTest.java | 53 +
.../runtime/WritableComparatorUUIDTest.java | 46 +
.../api/java/typeutils/runtime/WritableID.java | 78 +
.../runtime/WritableSerializerTest.java | 50 +
.../runtime/WritableSerializerUUIDTest.java | 50 +
.../hadoopcompatibility/HadoopUtilsTest.java | 34 +
.../mapred/HadoopMapFunctionITCase.java | 182 ++
.../mapred/HadoopMapredITCase.java | 47 +
.../HadoopReduceCombineFunctionITCase.java | 265 +++
.../mapred/HadoopReduceFunctionITCase.java | 213 ++
.../mapred/HadoopTestData.java | 62 +
.../example/HadoopMapredCompatWordCount.java | 133 ++
.../HadoopTupleUnwrappingIteratorTest.java | 139 ++
.../mapreduce/HadoopInputOutputITCase.java | 47 +
.../mapreduce/example/WordCount.java | 119 ++
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/logback-test.xml | 29 +
flink-connectors/flink-hbase/pom.xml | 264 +++
.../flink/addons/hbase/TableInputFormat.java | 289 +++
.../flink/addons/hbase/TableInputSplit.java | 89 +
.../hbase/HBaseTestingClusterAutostarter.java | 238 +++
.../addons/hbase/TableInputFormatITCase.java | 120 ++
.../hbase/example/HBaseFlinkTestConstants.java | 28 +
.../addons/hbase/example/HBaseReadExample.java | 92 +
.../addons/hbase/example/HBaseWriteExample.java | 202 ++
.../hbase/example/HBaseWriteStreamExample.java | 113 +
.../src/test/resources/log4j-test.properties | 23 +
flink-connectors/flink-hcatalog/pom.xml | 182 ++
.../flink/hcatalog/HCatInputFormatBase.java | 410 ++++
.../flink/hcatalog/java/HCatInputFormat.java | 160 ++
.../flink/hcatalog/scala/HCatInputFormat.scala | 229 ++
flink-connectors/flink-jdbc/pom.xml | 66 +
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 404 ++++
.../api/java/io/jdbc/JDBCOutputFormat.java | 315 +++
.../split/GenericParameterValuesProvider.java | 44 +
.../split/NumericBetweenParametersProvider.java | 66 +
.../io/jdbc/split/ParameterValuesProvider.java | 35 +
.../flink/api/java/io/jdbc/JDBCFullTest.java | 101 +
.../api/java/io/jdbc/JDBCInputFormatTest.java | 247 +++
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 169 ++
.../flink/api/java/io/jdbc/JDBCTestBase.java | 183 ++
.../src/test/resources/log4j-test.properties | 19 +
.../src/test/resources/logback-test.xml | 29 +
flink-connectors/pom.xml | 75 +
.../flink-connector-cassandra/pom.xml | 179 --
.../cassandra/CassandraInputFormat.java | 131 --
.../cassandra/CassandraOutputFormat.java | 125 --
.../cassandra/CassandraCommitter.java | 151 --
.../connectors/cassandra/CassandraPojoSink.java | 67 -
.../connectors/cassandra/CassandraSink.java | 329 ---
.../connectors/cassandra/CassandraSinkBase.java | 98 -
.../cassandra/CassandraTupleSink.java | 59 -
.../cassandra/CassandraTupleWriteAheadSink.java | 159 --
.../connectors/cassandra/ClusterBuilder.java | 43 -
.../cassandra/example/BatchExample.java | 77 -
.../cassandra/CassandraConnectorITCase.java | 440 ----
.../CassandraTupleWriteAheadSinkTest.java | 127 --
.../streaming/connectors/cassandra/Pojo.java | 65 -
.../example/CassandraPojoSinkExample.java | 62 -
.../example/CassandraTupleSinkExample.java | 62 -
.../CassandraTupleWriteAheadSinkExample.java | 96 -
.../connectors/cassandra/example/Message.java | 56 -
.../src/test/resources/cassandra.yaml | 43 -
.../src/test/resources/log4j-test.properties | 29 -
.../flink-connector-elasticsearch/pom.xml | 90 -
.../elasticsearch/ElasticsearchSink.java | 315 ---
.../elasticsearch/IndexRequestBuilder.java | 66 -
.../elasticsearch/ElasticsearchSinkITCase.java | 205 --
.../examples/ElasticsearchExample.java | 80 -
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-elasticsearch2/pom.xml | 83 -
.../elasticsearch2/BulkProcessorIndexer.java | 35 -
.../elasticsearch2/ElasticsearchSink.java | 257 ---
.../ElasticsearchSinkFunction.java | 60 -
.../elasticsearch2/RequestIndexer.java | 25 -
.../elasticsearch2/ElasticsearchSinkITCase.java | 233 --
.../examples/ElasticsearchExample.java | 90 -
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-filesystem/pom.xml | 163 --
.../connectors/fs/AvroKeyValueSinkWriter.java | 309 ---
.../flink/streaming/connectors/fs/Bucketer.java | 55 -
.../flink/streaming/connectors/fs/Clock.java | 33 -
.../connectors/fs/DateTimeBucketer.java | 126 --
.../connectors/fs/NonRollingBucketer.java | 47 -
.../streaming/connectors/fs/RollingSink.java | 916 --------
.../connectors/fs/SequenceFileWriter.java | 151 --
.../connectors/fs/StreamWriterBase.java | 152 --
.../streaming/connectors/fs/StringWriter.java | 86 -
.../streaming/connectors/fs/SystemClock.java | 29 -
.../flink/streaming/connectors/fs/Writer.java | 73 -
.../fs/bucketing/BasePathBucketer.java | 39 -
.../connectors/fs/bucketing/Bucketer.java | 47 -
.../connectors/fs/bucketing/BucketingSink.java | 1082 ----------
.../fs/bucketing/DateTimeBucketer.java | 102 -
.../src/main/resources/log4j.properties | 27 -
.../fs/RollingSinkFaultToleranceITCase.java | 300 ---
.../connectors/fs/RollingSinkITCase.java | 991 ---------
.../connectors/fs/RollingSinkSecuredITCase.java | 252 ---
.../BucketingSinkFaultToleranceITCase.java | 297 ---
.../fs/bucketing/BucketingSinkTest.java | 867 --------
.../src/test/resources/log4j-test.properties | 29 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-flume/pom.xml | 175 --
.../streaming/connectors/flume/FlumeSink.java | 141 --
.../flink-connector-kafka-0.10/pom.xml | 205 --
.../connectors/kafka/FlinkKafkaConsumer010.java | 153 --
.../connectors/kafka/FlinkKafkaProducer010.java | 398 ----
.../kafka/Kafka010JsonTableSource.java | 71 -
.../connectors/kafka/Kafka010TableSource.java | 75 -
.../kafka/internal/Kafka010Fetcher.java | 104 -
.../internal/KafkaConsumerCallBridge010.java | 40 -
.../src/main/resources/log4j.properties | 29 -
.../connectors/kafka/Kafka010FetcherTest.java | 484 -----
.../connectors/kafka/Kafka010ITCase.java | 313 ---
.../kafka/Kafka010ProducerITCase.java | 33 -
.../kafka/KafkaTestEnvironmentImpl.java | 420 ----
.../src/test/resources/log4j-test.properties | 30 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-kafka-0.8/pom.xml | 219 --
.../connectors/kafka/FlinkKafkaConsumer08.java | 398 ----
.../connectors/kafka/FlinkKafkaConsumer081.java | 39 -
.../connectors/kafka/FlinkKafkaConsumer082.java | 39 -
.../connectors/kafka/FlinkKafkaProducer.java | 64 -
.../connectors/kafka/FlinkKafkaProducer08.java | 145 --
.../connectors/kafka/Kafka08JsonTableSink.java | 52 -
.../kafka/Kafka08JsonTableSource.java | 71 -
.../connectors/kafka/Kafka08TableSource.java | 75 -
.../kafka/internals/ClosableBlockingQueue.java | 507 -----
.../kafka/internals/Kafka08Fetcher.java | 481 -----
.../kafka/internals/KillerWatchDog.java | 62 -
.../kafka/internals/PartitionInfoFetcher.java | 66 -
.../internals/PeriodicOffsetCommitter.java | 85 -
.../kafka/internals/SimpleConsumerThread.java | 504 -----
.../kafka/internals/ZookeeperOffsetHandler.java | 164 --
.../connectors/kafka/Kafka08ITCase.java | 248 ---
.../kafka/Kafka08JsonTableSinkTest.java | 48 -
.../kafka/Kafka08JsonTableSourceTest.java | 45 -
.../connectors/kafka/Kafka08ProducerITCase.java | 32 -
.../connectors/kafka/KafkaConsumer08Test.java | 139 --
.../connectors/kafka/KafkaLocalSystemTime.java | 48 -
.../connectors/kafka/KafkaProducerTest.java | 123 --
.../kafka/KafkaShortRetention08ITCase.java | 34 -
.../kafka/KafkaTestEnvironmentImpl.java | 401 ----
.../internals/ClosableBlockingQueueTest.java | 603 ------
.../src/test/resources/log4j-test.properties | 30 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-kafka-0.9/pom.xml | 212 --
.../connectors/kafka/FlinkKafkaConsumer09.java | 269 ---
.../connectors/kafka/FlinkKafkaProducer09.java | 137 --
.../connectors/kafka/Kafka09JsonTableSink.java | 50 -
.../kafka/Kafka09JsonTableSource.java | 71 -
.../connectors/kafka/Kafka09TableSource.java | 75 -
.../connectors/kafka/internal/Handover.java | 214 --
.../kafka/internal/Kafka09Fetcher.java | 241 ---
.../kafka/internal/KafkaConsumerCallBridge.java | 41 -
.../kafka/internal/KafkaConsumerThread.java | 332 ---
.../src/main/resources/log4j.properties | 29 -
.../connectors/kafka/Kafka09FetcherTest.java | 482 -----
.../connectors/kafka/Kafka09ITCase.java | 129 --
.../kafka/Kafka09JsonTableSinkTest.java | 48 -
.../kafka/Kafka09JsonTableSourceTest.java | 45 -
.../connectors/kafka/Kafka09ProducerITCase.java | 32 -
.../kafka/Kafka09SecuredRunITCase.java | 62 -
.../connectors/kafka/KafkaProducerTest.java | 126 --
.../kafka/KafkaTestEnvironmentImpl.java | 439 ----
.../connectors/kafka/internal/HandoverTest.java | 387 ----
.../src/test/resources/log4j-test.properties | 32 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-kafka-base/pom.xml | 212 --
.../kafka/FlinkKafkaConsumerBase.java | 552 -----
.../kafka/FlinkKafkaProducerBase.java | 386 ----
.../connectors/kafka/KafkaJsonTableSink.java | 47 -
.../connectors/kafka/KafkaJsonTableSource.java | 97 -
.../connectors/kafka/KafkaTableSink.java | 127 --
.../connectors/kafka/KafkaTableSource.java | 155 --
.../kafka/internals/AbstractFetcher.java | 552 -----
.../kafka/internals/ExceptionProxy.java | 125 --
.../kafka/internals/KafkaTopicPartition.java | 120 --
.../internals/KafkaTopicPartitionLeader.java | 98 -
.../internals/KafkaTopicPartitionState.java | 118 --
...picPartitionStateWithPeriodicWatermarks.java | 71 -
...cPartitionStateWithPunctuatedWatermarks.java | 84 -
.../connectors/kafka/internals/TypeUtil.java | 38 -
.../internals/metrics/KafkaMetricWrapper.java | 37 -
.../kafka/partitioner/FixedPartitioner.java | 76 -
.../kafka/partitioner/KafkaPartitioner.java | 41 -
.../JSONDeserializationSchema.java | 46 -
.../JSONKeyValueDeserializationSchema.java | 72 -
.../JsonRowDeserializationSchema.java | 135 --
.../JsonRowSerializationSchema.java | 70 -
.../KeyedDeserializationSchema.java | 52 -
.../KeyedDeserializationSchemaWrapper.java | 51 -
.../serialization/KeyedSerializationSchema.java | 55 -
.../KeyedSerializationSchemaWrapper.java | 48 -
...eInformationKeyValueSerializationSchema.java | 196 --
.../kafka/FlinkKafkaConsumerBaseTest.java | 416 ----
.../kafka/FlinkKafkaProducerBaseTest.java | 288 ---
.../kafka/JSONDeserializationSchemaTest.java | 41 -
.../JSONKeyValueDeserializationSchemaTest.java | 68 -
.../kafka/JsonRowDeserializationSchemaTest.java | 124 --
.../kafka/JsonRowSerializationSchemaTest.java | 98 -
.../KafkaConsumerPartitionAssignmentTest.java | 269 ---
.../connectors/kafka/KafkaConsumerTestBase.java | 2006 ------------------
.../connectors/kafka/KafkaProducerTestBase.java | 193 --
.../kafka/KafkaShortRetentionTestBase.java | 291 ---
.../kafka/KafkaTableSinkTestBase.java | 106 -
.../kafka/KafkaTableSourceTestBase.java | 77 -
.../connectors/kafka/KafkaTestBase.java | 203 --
.../connectors/kafka/KafkaTestEnvironment.java | 112 -
.../connectors/kafka/TestFixedPartitioner.java | 104 -
.../AbstractFetcherTimestampsTest.java | 320 ---
.../internals/KafkaTopicPartitionTest.java | 57 -
.../kafka/testutils/DataGenerators.java | 227 --
.../kafka/testutils/FailingIdentityMapper.java | 115 -
.../testutils/FakeStandardProducerConfig.java | 34 -
.../testutils/JobManagerCommunicationUtils.java | 120 --
.../testutils/PartitionValidatingMapper.java | 53 -
.../kafka/testutils/ThrottledMapper.java | 44 -
.../kafka/testutils/Tuple2Partitioner.java | 48 -
.../testutils/ValidatingExactlyOnceSink.java | 82 -
.../testutils/ZooKeeperStringSerializer.java | 51 -
.../src/test/resources/log4j-test.properties | 29 -
.../src/test/resources/logback-test.xml | 30 -
.../flink-connector-kinesis/pom.xml | 164 --
.../kinesis/FlinkKinesisConsumer.java | 304 ---
.../kinesis/FlinkKinesisProducer.java | 292 ---
.../connectors/kinesis/KinesisPartitioner.java | 49 -
.../kinesis/config/AWSConfigConstants.java | 70 -
.../kinesis/config/ConsumerConfigConstants.java | 138 --
.../kinesis/config/ProducerConfigConstants.java | 33 -
.../kinesis/examples/ConsumeFromKinesis.java | 54 -
.../kinesis/examples/ProduceIntoKinesis.java | 77 -
.../kinesis/internals/KinesisDataFetcher.java | 679 ------
.../kinesis/internals/ShardConsumer.java | 287 ---
.../kinesis/model/KinesisStreamShard.java | 133 --
.../kinesis/model/KinesisStreamShardState.java | 71 -
.../kinesis/model/SentinelSequenceNumber.java | 51 -
.../kinesis/model/SequenceNumber.java | 104 -
.../kinesis/proxy/GetShardListResult.java | 75 -
.../connectors/kinesis/proxy/KinesisProxy.java | 338 ---
.../kinesis/proxy/KinesisProxyInterface.java | 69 -
.../KinesisDeserializationSchema.java | 57 -
.../KinesisDeserializationSchemaWrapper.java | 57 -
.../KinesisSerializationSchema.java | 45 -
.../connectors/kinesis/util/AWSUtil.java | 130 --
.../kinesis/util/KinesisConfigUtil.java | 218 --
.../src/main/resources/log4j.properties | 27 -
.../kinesis/FlinkKinesisConsumerTest.java | 472 -----
.../internals/KinesisDataFetcherTest.java | 510 -----
.../kinesis/internals/ShardConsumerTest.java | 122 --
.../manualtests/ManualConsumerProducerTest.java | 121 --
.../manualtests/ManualExactlyOnceTest.java | 147 --
...nualExactlyOnceWithStreamReshardingTest.java | 247 ---
.../kinesis/manualtests/ManualProducerTest.java | 91 -
.../ExactlyOnceValidatingConsumerThread.java | 155 --
.../testutils/FakeKinesisBehavioursFactory.java | 262 ---
.../KinesisEventsGeneratorProducerThread.java | 118 --
.../testutils/KinesisShardIdGenerator.java | 25 -
.../testutils/TestableFlinkKinesisConsumer.java | 60 -
.../testutils/TestableKinesisDataFetcher.java | 122 --
.../flink-connector-nifi/pom.xml | 89 -
.../connectors/nifi/NiFiDataPacket.java | 39 -
.../connectors/nifi/NiFiDataPacketBuilder.java | 34 -
.../streaming/connectors/nifi/NiFiSink.java | 74 -
.../streaming/connectors/nifi/NiFiSource.java | 155 --
.../connectors/nifi/StandardNiFiDataPacket.java | 46 -
.../nifi/examples/NiFiSinkTopologyExample.java | 55 -
.../examples/NiFiSourceTopologyExample.java | 58 -
.../src/test/resources/NiFi_Flink.xml | 16 -
.../flink-connector-rabbitmq/pom.xml | 60 -
.../streaming/connectors/rabbitmq/RMQSink.java | 142 --
.../connectors/rabbitmq/RMQSource.java | 243 ---
.../rabbitmq/common/RMQConnectionConfig.java | 448 ----
.../connectors/rabbitmq/RMQSourceTest.java | 419 ----
.../common/RMQConnectionConfigTest.java | 69 -
.../connectors/rabbitmq/common/RMQSinkTest.java | 125 --
.../flink-connector-redis/pom.xml | 79 -
.../streaming/connectors/redis/RedisSink.java | 188 --
.../common/config/FlinkJedisClusterConfig.java | 187 --
.../common/config/FlinkJedisConfigBase.java | 90 -
.../common/config/FlinkJedisPoolConfig.java | 224 --
.../common/config/FlinkJedisSentinelConfig.java | 259 ---
.../common/container/RedisClusterContainer.java | 171 --
.../container/RedisCommandsContainer.java | 115 -
.../RedisCommandsContainerBuilder.java | 116 -
.../redis/common/container/RedisContainer.java | 252 ---
.../redis/common/mapper/RedisCommand.java | 86 -
.../common/mapper/RedisCommandDescription.java | 94 -
.../redis/common/mapper/RedisDataType.java | 66 -
.../redis/common/mapper/RedisMapper.java | 66 -
.../connectors/redis/RedisITCaseBase.java | 45 -
.../redis/RedisSentinelClusterTest.java | 100 -
.../connectors/redis/RedisSinkITCase.java | 233 --
.../redis/RedisSinkPublishITCase.java | 137 --
.../connectors/redis/RedisSinkTest.java | 144 --
.../common/config/FlinkJedisConfigBaseTest.java | 50 -
.../common/config/JedisClusterConfigTest.java | 49 -
.../common/config/JedisPoolConfigTest.java | 29 -
.../common/config/JedisSentinelConfigTest.java | 49 -
.../mapper/RedisDataTypeDescriptionTest.java | 41 -
.../flink-connector-twitter/pom.xml | 96 -
.../connectors/twitter/TwitterSource.java | 217 --
flink-streaming-connectors/pom.xml | 70 -
pom.xml | 9 +-
687 files changed, 53344 insertions(+), 53385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e18629d..6cbccc1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,7 +17,7 @@ tmp
*.log
.DS_Store
build-target
-flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/
+flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/
flink-runtime-web/web-dashboard/assets/fonts/
flink-runtime-web/web-dashboard/node_modules/
flink-runtime-web/web-dashboard/bower_components/
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/pom.xml b/flink-batch-connectors/flink-avro/pom.xml
deleted file mode 100644
index 1161173..0000000
--- a/flink-batch-connectors/flink-avro/pom.xml
+++ /dev/null
@@ -1,216 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-batch-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-avro_2.10</artifactId>
- <name>flink-avro</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <!-- version is derived from base module -->
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>create-test-dependency</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <archive>
- <manifest>
- <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
- </manifest>
- </archive>
- <finalName>maven</finalName>
- <attach>false</attach>
- <descriptors>
- <descriptor>src/test/assembly/test-assembly.xml</descriptor>
- </descriptors>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
- classpath when running the tests to actually test whether the user code class loader
- is properly used.-->
- <plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version><!--$NO-MVN-MAN-VER$-->
- <executions>
- <execution>
- <id>remove-avroexternalprogram</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>clean</goal>
- </goals>
- <configuration>
- <excludeDefaultDirectories>true</excludeDefaultDirectories>
- <filesets>
- <fileset>
- <directory>${project.build.testOutputDirectory}</directory>
- <includes>
- <include>**/testjar/*.class</include>
- </includes>
- </fileset>
- </filesets>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Generate Test class from avro schema -->
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <version>1.7.7</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
- <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <versionRange>[2.4,)</versionRange>
- <goals>
- <goal>single</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
- <versionRange>[1,)</versionRange>
- <goals>
- <goal>clean</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <versionRange>[1.7.7,)</versionRange>
- <goals>
- <goal>schema</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
deleted file mode 100644
index 59da4cb..0000000
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder {
-
- private final Utf8 stringDecoder = new Utf8();
-
- private DataInput in;
-
- public void setIn(DataInput in) {
- this.in = in;
- }
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readNull() {}
-
-
- @Override
- public boolean readBoolean() throws IOException {
- return in.readBoolean();
- }
-
- @Override
- public int readInt() throws IOException {
- return in.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return in.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return in.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return in.readDouble();
- }
-
- @Override
- public int readEnum() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readFixed(byte[] bytes, int start, int length) throws IOException {
- in.readFully(bytes, start, length);
- }
-
- @Override
- public ByteBuffer readBytes(ByteBuffer old) throws IOException {
- int length = readInt();
- ByteBuffer result;
- if (old != null && length <= old.capacity() && old.hasArray()) {
- result = old;
- result.clear();
- } else {
- result = ByteBuffer.allocate(length);
- }
- in.readFully(result.array(), result.arrayOffset() + result.position(), length);
- result.limit(length);
- return result;
- }
-
-
- @Override
- public void skipFixed(int length) throws IOException {
- skipBytes(length);
- }
-
- @Override
- public void skipBytes() throws IOException {
- int num = readInt();
- skipBytes(num);
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public Utf8 readString(Utf8 old) throws IOException {
- int length = readInt();
- Utf8 result = (old != null ? old : new Utf8());
- result.setByteLength(length);
-
- if (length > 0) {
- in.readFully(result.getBytes(), 0, length);
- }
-
- return result;
- }
-
- @Override
- public String readString() throws IOException {
- return readString(stringDecoder).toString();
- }
-
- @Override
- public void skipString() throws IOException {
- int len = readInt();
- skipBytes(len);
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public long readArrayStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long arrayNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipArray() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long readMapStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long mapNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipMap() throws IOException {
- return readVarLongCount(in);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int readIndex() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
- private void skipBytes(int num) throws IOException {
- while (num > 0) {
- num -= in.skipBytes(num);
- }
- }
-
- public static long readVarLongCount(DataInput in) throws IOException {
- long value = in.readUnsignedByte();
-
- if ((value & 0x80) == 0) {
- return value;
- }
- else {
- long curr;
- int shift = 7;
- value = value & 0x7f;
- while (((curr = in.readUnsignedByte()) & 0x80) != 0){
- value |= (curr & 0x7f) << shift;
- shift += 7;
- }
- value |= curr << shift;
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
deleted file mode 100644
index 0102cc1..0000000
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private DataOutput out;
-
-
- public void setOut(DataOutput out) {
- this.out = out;
- }
-
-
- @Override
- public void flush() throws IOException {}
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeNull() {}
-
-
- @Override
- public void writeBoolean(boolean b) throws IOException {
- out.writeBoolean(b);
- }
-
- @Override
- public void writeInt(int n) throws IOException {
- out.writeInt(n);
- }
-
- @Override
- public void writeLong(long n) throws IOException {
- out.writeLong(n);
- }
-
- @Override
- public void writeFloat(float f) throws IOException {
- out.writeFloat(f);
- }
-
- @Override
- public void writeDouble(double d) throws IOException {
- out.writeDouble(d);
- }
-
- @Override
- public void writeEnum(int e) throws IOException {
- out.writeInt(e);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeFixed(byte[] bytes, int start, int len) throws IOException {
- out.write(bytes, start, len);
- }
-
- @Override
- public void writeBytes(byte[] bytes, int start, int len) throws IOException {
- out.writeInt(len);
- if (len > 0) {
- out.write(bytes, start, len);
- }
- }
-
- @Override
- public void writeBytes(ByteBuffer bytes) throws IOException {
- int num = bytes.remaining();
- out.writeInt(num);
-
- if (num > 0) {
- writeFixed(bytes);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeString(String str) throws IOException {
- byte[] bytes = Utf8.getBytesFor(str);
- writeBytes(bytes, 0, bytes.length);
- }
-
- @Override
- public void writeString(Utf8 utf8) throws IOException {
- writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeArrayStart() {}
-
- @Override
- public void setItemCount(long itemCount) throws IOException {
- if (itemCount > 0) {
- writeVarLongCount(out, itemCount);
- }
- }
-
- @Override
- public void startItem() {}
-
- @Override
- public void writeArrayEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- @Override
- public void writeMapStart() {}
-
- @Override
- public void writeMapEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeIndex(int unionIndex) throws IOException {
- out.writeInt(unionIndex);
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
-
- public static void writeVarLongCount(DataOutput out, long val) throws IOException {
- if (val < 0) {
- throw new IOException("Illegal count (must be non-negative): " + val);
- }
-
- while ((val & ~0x7FL) != 0) {
- out.write(((int) val) | 0x80);
- val >>>= 7;
- }
- out.write((int) val);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
deleted file mode 100644
index 709c4f1..0000000
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
-
-
-/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
- *
- * The wrapper keeps track of the position in the data stream.
- */
-public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
- private final FSDataInputStream stream;
- private long pos;
- private long len;
-
- public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
- this.stream = stream;
- this.pos = 0;
- this.len = len;
- }
-
- public long length() throws IOException {
- return this.len;
- }
-
- public int read(byte[] b, int off, int len) throws IOException {
- int read;
- read = stream.read(b, off, len);
- pos += read;
- return read;
- }
-
- public void seek(long p) throws IOException {
- stream.seek(p);
- pos = p;
- }
-
- public long tell() throws IOException {
- return pos;
- }
-
- public void close() throws IOException {
- stream.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
deleted file mode 100644
index 73067c1..0000000
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.common.io.CheckpointableInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * Provides a {@link FileInputFormat} for Avro records.
- *
- * @param <E>
- * the type of the result Avro record. If you specify
- * {@link GenericRecord} then the result will be returned as a
- * {@link GenericRecord}, so you do not have to know the schema ahead
- * of time.
- */
-public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
- CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
-
- private final Class<E> avroValueType;
-
- private boolean reuseAvroValue = true;
-
- private transient DataFileReader<E> dataFileReader;
-
- private transient long end;
-
- private transient long recordsReadSinceLastSync;
-
- private long lastSync = -1l;
-
- public AvroInputFormat(Path filePath, Class<E> type) {
- super(filePath);
- this.avroValueType = type;
- }
-
- /**
- * Sets the flag whether to reuse the Avro value instance for all records.
- * By default, the input format reuses the Avro value.
- *
- * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
- */
- public void setReuseAvroValue(boolean reuseAvroValue) {
- this.reuseAvroValue = reuseAvroValue;
- }
-
- /**
- * If set, the InputFormat will only read entire files.
- */
- public void setUnsplittable(boolean unsplittable) {
- this.unsplittable = unsplittable;
- }
-
- // --------------------------------------------------------------------------------------------
- // Typing
- // --------------------------------------------------------------------------------------------
-
- @Override
- public TypeInformation<E> getProducedType() {
- return TypeExtractor.getForClass(this.avroValueType);
- }
-
- // --------------------------------------------------------------------------------------------
- // Input Format Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
- dataFileReader = initReader(split);
- dataFileReader.sync(split.getStart());
- lastSync = dataFileReader.previousSync();
- }
-
- private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
- DatumReader<E> datumReader;
-
- if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
- datumReader = new GenericDatumReader<E>();
- } else {
- datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
- ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("Opening split {}", split);
- }
-
- SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
- DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
- }
-
- end = split.getStart() + split.getLength();
- recordsReadSinceLastSync = 0;
- return dataFileReader;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
- }
-
- public long getRecordsReadFromBlock() {
- return this.recordsReadSinceLastSync;
- }
-
- @Override
- public E nextRecord(E reuseValue) throws IOException {
- if (reachedEnd()) {
- return null;
- }
-
- // if we start a new block, then register the event, and
- // restart the counter.
- if(dataFileReader.previousSync() != lastSync) {
- lastSync = dataFileReader.previousSync();
- recordsReadSinceLastSync = 0;
- }
- recordsReadSinceLastSync++;
-
- if (reuseAvroValue) {
- return dataFileReader.next(reuseValue);
- } else {
- if (GenericRecord.class == avroValueType) {
- return dataFileReader.next();
- } else {
- return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Checkpointing
- // --------------------------------------------------------------------------------------------
-
- @Override
- public Tuple2<Long, Long> getCurrentState() throws IOException {
- return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
- }
-
- @Override
- public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
- Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
- Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
-
- try {
- this.open(split);
- } finally {
- if (state.f0 != -1) {
- lastSync = state.f0;
- recordsReadSinceLastSync = state.f1;
- }
- }
-
- if (lastSync != -1) {
- // open and read until the record we were before
- // the checkpoint and discard the values
- dataFileReader.seek(lastSync);
- for(int i = 0; i < recordsReadSinceLastSync; i++) {
- dataFileReader.next(null);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
deleted file mode 100644
index 600d1e5..0000000
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
-
- /**
- * Wrapper which encapsulates the supported codec and a related serialization byte.
- */
- public enum Codec {
-
- NULL((byte)0, CodecFactory.nullCodec()),
- SNAPPY((byte)1, CodecFactory.snappyCodec()),
- BZIP2((byte)2, CodecFactory.bzip2Codec()),
- DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
- XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
-
- private byte codecByte;
-
- private CodecFactory codecFactory;
-
- Codec(final byte codecByte, final CodecFactory codecFactory) {
- this.codecByte = codecByte;
- this.codecFactory = codecFactory;
- }
-
- private byte getCodecByte() {
- return codecByte;
- }
-
- private CodecFactory getCodecFactory() {
- return codecFactory;
- }
-
- private static Codec forCodecByte(byte codecByte) {
- for (final Codec codec : Codec.values()) {
- if (codec.getCodecByte() == codecByte) {
- return codec;
- }
- }
- throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
- }
- }
-
- private static final long serialVersionUID = 1L;
-
- private final Class<E> avroValueType;
-
- private transient Schema userDefinedSchema = null;
-
- private transient Codec codec = null;
-
- private transient DataFileWriter<E> dataFileWriter;
-
- public AvroOutputFormat(Path filePath, Class<E> type) {
- super(filePath);
- this.avroValueType = type;
- }
-
- public AvroOutputFormat(Class<E> type) {
- this.avroValueType = type;
- }
-
- @Override
- protected String getDirectoryFileName(int taskNumber) {
- return super.getDirectoryFileName(taskNumber) + ".avro";
- }
-
- public void setSchema(Schema schema) {
- this.userDefinedSchema = schema;
- }
-
- /**
- * Set avro codec for compression.
- *
- * @param codec avro codec.
- */
- public void setCodec(final Codec codec) {
- this.codec = checkNotNull(codec, "codec can not be null");
- }
-
- @Override
- public void writeRecord(E record) throws IOException {
- dataFileWriter.append(record);
- }
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- super.open(taskNumber, numTasks);
-
- DatumWriter<E> datumWriter;
- Schema schema;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumWriter = new SpecificDatumWriter<E>(avroValueType);
- try {
- schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
- } catch (InstantiationException e) {
- throw new RuntimeException(e.getMessage());
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e.getMessage());
- }
- } else {
- datumWriter = new ReflectDatumWriter<E>(avroValueType);
- schema = ReflectData.get().getSchema(avroValueType);
- }
- dataFileWriter = new DataFileWriter<E>(datumWriter);
- if (codec != null) {
- dataFileWriter.setCodec(codec.getCodecFactory());
- }
- if (userDefinedSchema == null) {
- dataFileWriter.create(schema, stream);
- } else {
- dataFileWriter.create(userDefinedSchema, stream);
- }
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
-
- if (codec != null) {
- out.writeByte(codec.getCodecByte());
- } else {
- out.writeByte(-1);
- }
-
- if(userDefinedSchema != null) {
- byte[] json = userDefinedSchema.toString().getBytes();
- out.writeInt(json.length);
- out.write(json);
- } else {
- out.writeInt(0);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- byte codecByte = in.readByte();
- if (codecByte >= 0) {
- setCodec(Codec.forCodecByte(codecByte));
- }
-
- int length = in.readInt();
- if(length != 0) {
- byte[] json = new byte[length];
- in.readFully(json);
-
- Schema schema = new Schema.Parser().parse(new String(json));
- setSchema(schema);
- }
- }
-
- @Override
- public void close() throws IOException {
- dataFileWriter.flush();
- dataFileWriter.close();
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml
deleted file mode 100644
index 0f4561a..0000000
--- a/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<assembly>
- <id>test-jar</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${project.build.testOutputDirectory}</directory>
- <outputDirectory>/</outputDirectory>
- <!--modify/add include to match your package(s) -->
- <includes>
- <include>org/apache/flink/api/avro/testjar/**</include>
- </includes>
- </fileSet>
- </fileSets>
-</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
deleted file mode 100644
index 1030ff8..0000000
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.File;
-
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class AvroExternalJarProgramITCase {
-
- private static final String JAR_FILE = "maven-test-jar.jar";
-
- private static final String TEST_DATA_FILE = "/testdata.avro";
-
- @Test
- public void testExternalProgram() {
-
- LocalFlinkMiniCluster testMiniCluster = null;
-
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- testMiniCluster = new LocalFlinkMiniCluster(config, false);
- testMiniCluster.start();
-
- String jarFile = JAR_FILE;
- String testData = getClass().getResource(TEST_DATA_FILE).toString();
-
- PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
-
-
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
-
- ClusterClient client = new StandaloneClusterClient(config);
-
- client.setPrintStatusDuringExecution(false);
- client.run(program, 4);
-
- }
- catch (Throwable t) {
- System.err.println(t.getMessage());
- t.printStackTrace();
- Assert.fail("Error during the packaged program execution: " + t.getMessage());
- }
- finally {
- if (testMiniCluster != null) {
- try {
- testMiniCluster.stop();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
deleted file mode 100644
index 3b01ccb..0000000
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatITCase extends JavaProgramTestBase {
-
- public static String outputPath1;
-
- public static String outputPath2;
-
- public static String inputPath;
-
- public static String userData = "alice|1|blue\n" +
- "bob|2|red\n" +
- "john|3|yellow\n" +
- "walt|4|black\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inputPath = createTempFile("user", userData);
- outputPath1 = getTempDirPath("avro_output1");
- outputPath2 = getTempDirPath("avro_output2");
- }
-
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
- .fieldDelimiter("|")
- .types(String.class, Integer.class, String.class);
-
- //output the data with AvroOutputFormat for specific user type
- DataSet<User> specificUser = input.map(new ConvertToUser());
- AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
- avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec
- avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
- specificUser.write(avroOutputFormat, outputPath1);
-
- //output the data with AvroOutputFormat for reflect user type
- DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
- reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- //compare result for specific user type
- File [] output1;
- File file1 = asFile(outputPath1);
- if (file1.isDirectory()) {
- output1 = file1.listFiles();
- // check for avro ext in dir.
- for (File avroOutput : output1) {
- Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
- }
- } else {
- output1 = new File[] {file1};
- }
- List<String> result1 = new ArrayList<String>();
- DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
- for (File avroOutput : output1) {
-
- DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
- while (dataFileReader1.hasNext()) {
- User user = dataFileReader1.next();
- result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
- }
-
- //compare result for reflect user type
- File [] output2;
- File file2 = asFile(outputPath2);
- if (file2.isDirectory()) {
- output2 = file2.listFiles();
- } else {
- output2 = new File[] {file2};
- }
- List<String> result2 = new ArrayList<String>();
- DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
- for (File avroOutput : output2) {
- DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
- while (dataFileReader2.hasNext()) {
- ReflectiveUser user = dataFileReader2.next();
- result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
- }
-
-
- }
-
-
- public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
-
- @Override
- public User map(Tuple3<String, Integer, String> value) throws Exception {
- return new User(value.f0, value.f1, value.f2);
- }
- }
-
- public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
-
- @Override
- public ReflectiveUser map(User value) throws Exception {
- return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
- }
- }
-
-
- public static class ReflectiveUser {
- private String name;
- private int favoriteNumber;
- private String favoriteColor;
-
- public ReflectiveUser() {}
-
- public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
- this.name = name;
- this.favoriteNumber = favoriteNumber;
- this.favoriteColor = favoriteColor;
- }
-
- public String getName() {
- return this.name;
- }
- public String getFavoriteColor() {
- return this.favoriteColor;
- }
- public int getFavoriteNumber() {
- return this.favoriteNumber;
- }
- }
-}