You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/06/30 02:21:37 UTC
[11/11] flume git commit: FLUME-2937. Integrate checkstyle for
non-test classes
FLUME-2937. Integrate checkstyle for non-test classes
Based on the Google checkstyle file with modifications.
The changes here do not change the generated Java bytecode (after
stripping line numbers). They are syntax / whitespace ONLY.
Code review: https://reviews.apache.org/r/49403/
Reviewed by Hari.
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2252fb19
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2252fb19
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2252fb19
Branch: refs/heads/trunk
Commit: 2252fb1938a4fd578f88c64eb444c74777c46212
Parents: 2fe3938
Author: Mike Percy <mp...@cloudera.com>
Authored: Sun Jun 26 02:57:37 2016 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Wed Jun 29 19:16:57 2016 -0700
----------------------------------------------------------------------
flume-checkstyle/pom.xml | 36 ++
.../resources/flume/checkstyle-suppressions.xml | 43 +++
.../src/main/resources/flume/checkstyle.xml | 177 ++++++++++
.../flume/api/SecureRpcClientFactory.java | 4 +-
.../apache/flume/api/SecureThriftRpcClient.java | 41 ++-
.../flume/auth/FlumeAuthenticationUtil.java | 6 +-
.../flume/auth/KerberosAuthenticator.java | 29 +-
.../org/apache/flume/auth/KerberosUser.java | 4 +-
.../apache/flume/auth/SimpleAuthenticator.java | 4 +-
.../java/org/apache/flume/auth/UGIExecutor.java | 8 +-
.../channel/file/BadCheckpointException.java | 3 +-
.../flume/channel/file/CheckpointRebuilder.java | 110 +++----
.../org/apache/flume/channel/file/Commit.java | 16 +-
.../channel/file/CorruptEventException.java | 3 +-
.../file/EventQueueBackingStoreFactory.java | 71 ++--
.../file/EventQueueBackingStoreFile.java | 138 ++++----
.../file/EventQueueBackingStoreFileV2.java | 2 +-
.../file/EventQueueBackingStoreFileV3.java | 78 ++---
.../apache/flume/channel/file/EventUtils.java | 2 +-
.../apache/flume/channel/file/FileChannel.java | 156 ++++-----
.../channel/file/FileChannelConfiguration.java | 6 +-
.../apache/flume/channel/file/FlumeEvent.java | 26 +-
.../flume/channel/file/FlumeEventPointer.java | 10 +-
.../flume/channel/file/FlumeEventQueue.java | 145 ++++----
.../java/org/apache/flume/channel/file/Log.java | 220 +++++++------
.../org/apache/flume/channel/file/LogFile.java | 177 +++++-----
.../flume/channel/file/LogFileFactory.java | 73 +++--
.../file/LogFileRetryableIOException.java | 3 +
.../apache/flume/channel/file/LogFileV2.java | 19 +-
.../apache/flume/channel/file/LogFileV3.java | 113 ++++---
.../apache/flume/channel/file/LogRecord.java | 22 +-
.../org/apache/flume/channel/file/LogUtils.java | 2 +-
.../org/apache/flume/channel/file/Pair.java | 4 +-
.../java/org/apache/flume/channel/file/Put.java | 13 +-
.../flume/channel/file/ReplayHandler.java | 41 ++-
.../org/apache/flume/channel/file/Rollback.java | 9 +-
.../flume/channel/file/Serialization.java | 86 +++--
.../org/apache/flume/channel/file/Take.java | 10 +-
.../channel/file/TransactionEventRecord.java | 27 +-
.../flume/channel/file/TransactionIDOracle.java | 4 +-
.../flume/channel/file/WritableUtils.java | 8 +-
.../flume/channel/file/WriteOrderOracle.java | 4 +-
.../encryption/AESCTRNoPaddingProvider.java | 16 +-
.../channel/file/encryption/CipherProvider.java | 17 +-
.../file/encryption/CipherProviderFactory.java | 4 +-
.../file/encryption/CipherProviderType.java | 1 -
.../encryption/DecryptionFailureException.java | 1 -
.../file/encryption/JCEFileKeyProvider.java | 10 +-
.../file/encryption/KeyProviderType.java | 1 -
.../flume/channel/file/proto/ProtosFactory.java | 11 +-
.../channel/jdbc/ConfigurationConstants.java | 1 -
.../apache/flume/channel/jdbc/JdbcChannel.java | 1 +
.../flume/channel/jdbc/JdbcChannelProvider.java | 2 +-
.../channel/jdbc/impl/DerbySchemaHandler.java | 24 +-
.../jdbc/impl/JdbcChannelProviderImpl.java | 22 +-
.../channel/jdbc/impl/JdbcTransactionImpl.java | 1 -
.../flume/channel/jdbc/impl/SchemaHandler.java | 4 -
.../channel/jdbc/impl/SchemaHandlerFactory.java | 22 +-
.../flume/channel/kafka/KafkaChannel.java | 87 ++---
.../kafka/KafkaChannelConfiguration.java | 15 +-
.../flume/channel/SpillableMemoryChannel.java | 311 +++++++++---------
.../LoadBalancingLog4jAppender.java | 18 +-
.../clients/log4jappender/Log4jAppender.java | 26 +-
.../clients/log4jappender/Log4jAvroHeaders.java | 13 +-
.../src/main/java/org/apache/flume/Context.java | 10 +-
.../flume/conf/BasicConfigurationConstants.java | 1 -
.../flume/conf/ComponentConfiguration.java | 11 +-
.../conf/ComponentConfigurationFactory.java | 9 +-
.../apache/flume/conf/FlumeConfiguration.java | 142 ++++----
.../flume/conf/FlumeConfigurationError.java | 2 +-
.../flume/conf/sink/SinkConfiguration.java | 4 +-
.../flume/conf/sink/SinkGroupConfiguration.java | 3 +-
.../flume/conf/source/SourceConfiguration.java | 4 +-
.../java/org/apache/flume/ChannelFactory.java | 9 +-
.../java/org/apache/flume/ChannelSelector.java | 1 -
.../src/main/java/org/apache/flume/Clock.java | 4 +-
.../main/java/org/apache/flume/SinkFactory.java | 8 +-
.../main/java/org/apache/flume/SinkRunner.java | 6 +-
.../java/org/apache/flume/SourceFactory.java | 7 +-
.../java/org/apache/flume/SourceRunner.java | 2 +-
.../main/java/org/apache/flume/SystemClock.java | 6 +-
.../main/java/org/apache/flume/Transaction.java | 11 +-
.../apache/flume/annotations/Disposable.java | 2 +
.../flume/annotations/InterfaceStability.java | 1 +
.../apache/flume/annotations/Recyclable.java | 2 +
.../apache/flume/channel/AbstractChannel.java | 6 +-
.../flume/channel/AbstractChannelSelector.java | 2 +-
.../channel/BasicTransactionSemantics.java | 1 +
.../flume/channel/ChannelSelectorFactory.java | 2 +-
.../flume/channel/DefaultChannelFactory.java | 3 +-
.../org/apache/flume/channel/MemoryChannel.java | 98 +++---
.../channel/MultiplexingChannelSelector.java | 8 +-
.../flume/channel/PseudoTxnMemoryChannel.java | 6 +-
.../channel/ReplicatingChannelSelector.java | 6 +-
.../apache/flume/client/avro/AvroCLIClient.java | 17 +-
.../avro/ReliableSpoolingFileEventReader.java | 62 ++--
.../org/apache/flume/event/EventHelper.java | 10 +-
.../flume/formatter/output/BucketPath.java | 327 +++++++++----------
.../formatter/output/DefaultPathManager.java | 2 +-
.../flume/formatter/output/PathManager.java | 32 +-
.../formatter/output/PathManagerFactory.java | 89 ++---
.../flume/formatter/output/PathManagerType.java | 20 +-
.../formatter/output/RollTimePathManager.java | 56 ++--
.../flume/instrumentation/ChannelCounter.java | 10 +-
.../flume/instrumentation/GangliaServer.java | 6 -
.../instrumentation/MonitoredCounterGroup.java | 21 +-
.../flume/instrumentation/MonitoringType.java | 2 +-
.../flume/instrumentation/SinkCounter.java | 2 -
.../flume/instrumentation/SourceCounter.java | 17 +-
.../instrumentation/http/HTTPMetricsServer.java | 8 +-
.../kafka/KafkaChannelCounter.java | 6 +-
.../flume/instrumentation/util/JMXPollUtil.java | 18 +-
.../flume/interceptor/HostInterceptor.java | 3 +-
.../interceptor/RegexExtractorInterceptor.java | 11 +-
.../interceptor/RegexFilteringInterceptor.java | 13 +-
.../SearchAndReplaceInterceptor.java | 6 +-
.../flume/interceptor/StaticInterceptor.java | 13 +-
.../flume/interceptor/TimestampInterceptor.java | 1 +
.../apache/flume/lifecycle/LifecycleAware.java | 58 ++--
.../flume/lifecycle/LifecycleSupervisor.java | 31 +-
.../serialization/AvroEventDeserializer.java | 5 +-
.../serialization/BodyTextEventSerializer.java | 3 +-
.../HeaderAndBodyTextEventSerializer.java | 2 +-
.../flume/serialization/LineDeserializer.java | 3 +-
.../ResettableFileInputStream.java | 28 +-
.../org/apache/flume/sink/AbstractRpcSink.java | 51 ++-
.../org/apache/flume/sink/AbstractSink.java | 5 +-
.../flume/sink/AbstractSinkProcessor.java | 4 +-
.../apache/flume/sink/AbstractSinkSelector.java | 2 +-
.../apache/flume/sink/DefaultSinkFactory.java | 3 +-
.../apache/flume/sink/DefaultSinkProcessor.java | 3 +-
.../flume/sink/FailoverSinkProcessor.java | 31 +-
.../flume/sink/LoadBalancingSinkProcessor.java | 34 +-
.../java/org/apache/flume/sink/LoggerSink.java | 5 +-
.../java/org/apache/flume/sink/NullSink.java | 2 +-
.../org/apache/flume/sink/RollingFileSink.java | 12 +-
.../apache/flume/sink/SinkProcessorFactory.java | 5 +-
.../java/org/apache/flume/sink/ThriftSink.java | 12 +-
.../flume/source/AbstractEventDrivenSource.java | 6 +-
.../flume/source/AbstractPollableSource.java | 10 +-
.../org/apache/flume/source/AbstractSource.java | 4 +-
.../org/apache/flume/source/AvroSource.java | 102 +++---
.../flume/source/BasicSourceSemantics.java | 9 +-
.../flume/source/DefaultSourceFactory.java | 5 +-
.../org/apache/flume/source/ExecSource.java | 50 +--
.../ExecSourceConfigurationConstants.java | 3 +-
.../flume/source/MultiportSyslogTCPSource.java | 4 +-
.../NetcatSourceConfigurationConstants.java | 6 +-
.../flume/source/PollableSourceRunner.java | 8 +-
.../flume/source/SequenceGeneratorSource.java | 6 +-
.../flume/source/SpoolDirectorySource.java | 20 +-
...olDirectorySourceConfigurationConstants.java | 1 +
.../org/apache/flume/source/StressSource.java | 9 +-
.../org/apache/flume/source/SyslogParser.java | 13 +-
.../apache/flume/source/SyslogTcpSource.java | 8 +-
.../apache/flume/source/SyslogUDPSource.java | 26 +-
.../org/apache/flume/source/SyslogUtils.java | 262 +++++++--------
.../org/apache/flume/source/ThriftSource.java | 61 ++--
.../apache/flume/source/http/BLOBHandler.java | 8 +-
.../apache/flume/source/http/HTTPSource.java | 44 +--
.../apache/flume/source/http/JSONHandler.java | 14 +-
.../apache/flume/tools/DirectMemoryUtils.java | 10 +-
.../org/apache/flume/tools/GetJavaProperty.java | 2 +-
.../flume/tools/TimestampRoundDownUtil.java | 9 +-
.../org/apache/flume/tools/VersionInfo.java | 16 +-
.../flume/agent/embedded/EmbeddedAgent.java | 38 ++-
.../embedded/EmbeddedAgentConfiguration.java | 53 ++-
.../flume/agent/embedded/EmbeddedSource.java | 6 +-
.../source/thriftLegacy/ThriftLegacySource.java | 10 +-
.../node/AbstractConfigurationProvider.java | 121 ++++---
.../java/org/apache/flume/node/Application.java | 114 +++----
.../flume/node/ConfigurationProvider.java | 7 +-
...lingPropertiesFileConfigurationProvider.java | 13 +-
.../node/SimpleMaterializedConfiguration.java | 2 +-
.../org/apache/flume/api/AbstractRpcClient.java | 2 +-
.../org/apache/flume/api/FailoverRpcClient.java | 6 +-
.../java/org/apache/flume/api/HostInfo.java | 2 +-
.../flume/api/LoadBalancingRpcClient.java | 9 +-
.../apache/flume/api/NettyAvroRpcClient.java | 38 +--
.../api/RpcClientConfigurationConstants.java | 6 +-
.../org/apache/flume/api/RpcClientFactory.java | 16 +-
.../org/apache/flume/api/ThriftRpcClient.java | 120 ++++---
.../org/apache/flume/event/EventBuilder.java | 2 +-
.../java/org/apache/flume/event/JSONEvent.java | 6 +-
.../org/apache/flume/event/SimpleEvent.java | 2 +-
.../org/apache/flume/util/OrderSelector.java | 5 +-
.../org/apache/flume/sink/kite/DatasetSink.java | 12 +-
.../sink/kite/NonRecoverableEventException.java | 1 -
.../sink/kite/parser/EntityParserFactory.java | 1 -
.../sink/kite/policy/FailurePolicyFactory.java | 1 -
.../flume/sink/hdfs/AbstractHDFSWriter.java | 15 +-
.../flume/sink/hdfs/BucketClosedException.java | 2 +-
.../apache/flume/sink/hdfs/BucketWriter.java | 127 ++++---
.../sink/hdfs/HDFSCompressedDataStream.java | 9 +-
.../apache/flume/sink/hdfs/HDFSDataStream.java | 19 +-
.../apache/flume/sink/hdfs/HDFSEventSink.java | 76 ++---
.../flume/sink/hdfs/HDFSSequenceFile.java | 7 +-
.../apache/flume/sink/hdfs/KerberosUser.java | 4 +-
.../sink/hdfs/SequenceFileSerializerType.java | 3 +-
.../sink/hive/HiveDelimitedTextSerializer.java | 16 +-
.../flume/sink/hive/HiveEventSerializer.java | 1 -
.../org/apache/flume/sink/hive/HiveSink.java | 30 +-
.../org/apache/flume/sink/hive/HiveWriter.java | 141 ++++----
.../java/org/apache/flume/sink/irc/IRCSink.java | 4 +-
...ElasticSearchIndexRequestBuilderFactory.java | 5 +-
...ElasticSearchIndexRequestBuilderFactory.java | 2 +-
.../sink/elasticsearch/ElasticSearchSink.java | 1 +
...entSerializerIndexRequestBuilderFactory.java | 2 +-
.../client/ElasticSearchClientFactory.java | 6 +-
.../client/ElasticSearchRestClient.java | 23 +-
.../client/ElasticSearchTransportClient.java | 6 +-
.../elasticsearch/client/RoundRobinList.java | 2 +-
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 254 +++++++-------
.../sink/hbase/AsyncHbaseEventSerializer.java | 5 +-
.../org/apache/flume/sink/hbase/HBaseSink.java | 117 +++----
.../flume/sink/hbase/HbaseEventSerializer.java | 10 +-
.../sink/hbase/RegexHbaseEventSerializer.java | 50 +--
.../hbase/SimpleAsyncHbaseEventSerializer.java | 33 +-
.../sink/hbase/SimpleHbaseEventSerializer.java | 73 ++---
.../flume/sink/hbase/SimpleRowKeyGenerator.java | 22 +-
.../org/apache/flume/sink/kafka/KafkaSink.java | 45 +--
.../flume/sink/kafka/KafkaSinkConstants.java | 15 +-
.../sink/solr/morphline/BlobDeserializer.java | 3 +-
.../flume/sink/solr/morphline/BlobHandler.java | 3 +-
.../solr/morphline/MorphlineHandlerImpl.java | 6 +-
.../solr/morphline/MorphlineInterceptor.java | 5 +-
.../sink/solr/morphline/MorphlineSink.java | 8 +-
.../source/jms/DefaultJMSMessageConverter.java | 28 +-
.../flume/source/jms/InitialContextFactory.java | 1 -
.../flume/source/jms/JMSMessageConsumer.java | 84 +++--
.../source/jms/JMSMessageConsumerFactory.java | 10 +-
.../org/apache/flume/source/jms/JMSSource.java | 124 ++++---
.../apache/flume/source/kafka/KafkaSource.java | 55 ++--
.../source/kafka/KafkaSourceConstants.java | 9 +-
.../taildir/ReliableTaildirEventReader.java | 52 +--
.../apache/flume/source/taildir/TailFile.java | 88 +++--
.../flume/source/taildir/TaildirMatcher.java | 106 +++---
.../flume/source/taildir/TaildirSource.java | 18 +-
.../TaildirSourceConfigurationConstants.java | 4 +-
.../TestRpcClientCommunicationFailure.java | 60 ++--
.../flume/tools/FileChannelIntegrityTool.java | 85 +++--
.../org/apache/flume/tools/FlumeToolType.java | 2 +-
.../org/apache/flume/tools/FlumeToolsMain.java | 19 +-
pom.xml | 66 ++++
244 files changed, 3823 insertions(+), 3436 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-checkstyle/pom.xml
----------------------------------------------------------------------
diff --git a/flume-checkstyle/pom.xml b/flume-checkstyle/pom.xml
new file mode 100644
index 0000000..31db3c0
--- /dev/null
+++ b/flume-checkstyle/pom.xml
@@ -0,0 +1,36 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <!--
+ <parent>
+ <artifactId>flume-parent</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.7.0-SNAPSHOT</version>
+ </parent>
+ -->
+
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-checkstyle</artifactId>
+ <name>Flume checkstyle project</name>
+ <version>1.7.0-SNAPSHOT</version>
+</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
new file mode 100644
index 0000000..49c8834
--- /dev/null
+++ b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+<suppressions>
+
+ <!-- Suppress all style checks for generated code -->
+ <suppress checks=".*"
+ files="generated-sources|com/cloudera/flume/handlers/thrift|org/apache/flume/thrift/|org/apache/flume/source/scribe|ProtosFactory.java"/>
+
+ <!-- The "legacy" sources have a weird camelCaps package name -->
+ <suppress checks="PackageName"
+ files="org/apache/flume/source/avroLegacy|org/apache/flume/source/thriftLegacy"/>
+
+ <!-- TODO: Rearrange methods in below classes to keep overloaded methods adjacent -->
+ <suppress checks="OverloadMethodsDeclarationOrder"
+ files="channel/file|RpcClientFactory\.java|BucketPath\.java|SinkGroup\.java|DefaultSinkProcessor\.java|RegexExtractorInterceptorMillisSerializer\.java|SimpleAsyncHbaseEventSerializer\.java|hdfs/BucketWriter\.java"/>
+
+ <!-- TODO: Fix inner class names to follow standard convention -->
+ <suppress checks="TypeName"
+ files="SyslogUDPSource\.java|SyslogTcpSource\.java|TaildirSource\.java"/>
+
+ <!-- TODO: Add default cases to switch statements -->
+ <suppress checks="MissingSwitchDefault"
+ files="SyslogUtils\.java|ReliableTaildirEventReader\.java"/>
+
+ <!-- TODO: Avoid empty catch blocks -->
+ <suppress checks="EmptyCatchBlock"
+ files="channel/file/LogFile\.java"/>
+
+ <!-- TODO: Avoid empty if blocks -->
+ <suppress checks="EmptyBlockCheck"
+ files="ElasticSearchClientFactory\.java"/>
+
+ <!-- TODO: Fix line length issues -->
+ <suppress checks="LineLengthCheck"
+ files="channel/MemoryChannel\.java|ReliableSpoolingFileEventReader\.java"/>
+
+ <!-- TODO: Move helper classes to their own files -->
+ <suppress checks="OneTopLevelClass"
+ files="KafkaSource\.java|KafkaChannel\.java|KafkaSink\.java"/>
+
+</suppressions>
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-checkstyle/src/main/resources/flume/checkstyle.xml
----------------------------------------------------------------------
diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle.xml b/flume-checkstyle/src/main/resources/flume/checkstyle.xml
new file mode 100644
index 0000000..e8913f0
--- /dev/null
+++ b/flume-checkstyle/src/main/resources/flume/checkstyle.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0"?>
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+
+<!--
+ Checkstyle configuration that checks the Google coding conventions from Google Java Style
+ that can be found at https://google.github.io/styleguide/javaguide.html.
+
+ Checkstyle is very configurable. Be sure to read the documentation at
+ http://checkstyle.sf.net (or in your downloaded distribution).
+
+ To completely disable a check, just comment it out or delete it from the file.
+
+ Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov.
+ -->
+
+<module name = "Checker">
+ <property name="charset" value="UTF-8"/>
+
+ <property name="severity" value="warning"/>
+
+ <property name="fileExtensions" value="java, properties, xml"/>
+ <!-- Checks for whitespace -->
+ <!-- See http://checkstyle.sf.net/config_whitespace.html -->
+ <module name="FileTabCharacter">
+ <property name="eachLine" value="true"/>
+ </module>
+
+ <module name="TreeWalker">
+ <module name="OuterTypeFilename"/>
+ <module name="IllegalTokenText">
+ <property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
+ <property name="format" value="\\u00(08|09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
+ <property name="message" value="Avoid using corresponding octal or Unicode escape."/>
+ </module>
+ <module name="AvoidEscapedUnicodeCharacters">
+ <property name="allowEscapesForControlCharacters" value="true"/>
+ <property name="allowByTailComment" value="true"/>
+ <property name="allowNonPrintableEscapes" value="true"/>
+ </module>
+ <module name="LineLength">
+ <property name="max" value="100"/>
+ <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
+ </module>
+ <module name="AvoidStarImport">
+ <property name="allowStaticMemberImports" value="true"/>
+ </module>
+ <module name="OneTopLevelClass"/>
+ <module name="NoLineWrap"/>
+ <module name="EmptyBlock">
+ <property name="option" value="TEXT"/>
+ <property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
+ </module>
+ <module name="NeedBraces">
+ <property name="allowSingleLineStatement" value="true"/>
+ </module>
+ <module name="LeftCurly">
+ <property name="maxLineLength" value="100"/>
+ </module>
+ <module name="RightCurly"/>
+ <module name="RightCurly">
+ <property name="option" value="alone"/>
+ <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT"/>
+ </module>
+ <module name="WhitespaceAround">
+ <property name="allowEmptyConstructors" value="true"/>
+ <property name="allowEmptyMethods" value="true"/>
+ <property name="allowEmptyTypes" value="true"/>
+ <property name="allowEmptyLoops" value="true"/>
+ <message key="ws.notFollowed"
+ value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/>
+ <message key="ws.notPreceded"
+ value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
+ </module>
+ <module name="OneStatementPerLine"/>
+ <module name="ArrayTypeStyle"/>
+ <module name="MissingSwitchDefault"/>
+ <module name="FallThrough"/>
+ <module name="UpperEll"/>
+ <module name="ModifierOrder"/>
+ <module name="EmptyLineSeparator">
+ <property name="allowNoEmptyLineBetweenFields" value="true"/>
+ <property name="allowMultipleEmptyLines" value="false"/>
+ <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
+ <property name="tokens" value="IMPORT, CLASS_DEF, INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, CTOR_DEF, VARIABLE_DEF"/>
+ </module>
+ <module name="SeparatorWrap">
+ <property name="tokens" value="DOT"/>
+ <property name="option" value="nl"/>
+ </module>
+ <module name="SeparatorWrap">
+ <property name="tokens" value="COMMA"/>
+ <property name="option" value="EOL"/>
+ </module>
+ <module name="PackageName">
+ <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
+ <message key="name.invalidPattern"
+ value="Package name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="TypeName">
+ <message key="name.invalidPattern"
+ value="Type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="ClassTypeParameterName">
+ <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+ <message key="name.invalidPattern"
+ value="Class type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="MethodTypeParameterName">
+ <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+ <message key="name.invalidPattern"
+ value="Method type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="InterfaceTypeParameterName">
+ <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+ <message key="name.invalidPattern"
+ value="Interface type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="NoFinalizer"/>
+ <module name="GenericWhitespace">
+ <message key="ws.followed"
+ value="GenericWhitespace ''{0}'' is followed by whitespace."/>
+ <message key="ws.preceded"
+ value="GenericWhitespace ''{0}'' is preceded with whitespace."/>
+ <message key="ws.illegalFollow"
+ value="GenericWhitespace ''{0}'' should followed by whitespace."/>
+ <message key="ws.notPreceded"
+ value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
+ </module>
+ <module name="Indentation">
+ <property name="basicOffset" value="2"/>
+ <property name="braceAdjustment" value="0"/>
+ <property name="caseIndent" value="2"/>
+ <property name="throwsIndent" value="4"/>
+ <property name="lineWrappingIndentation" value="4"/>
+ <property name="arrayInitIndent" value="2"/>
+ </module>
+ <module name="OverloadMethodsDeclarationOrder"/>
+ <module name="MethodParamPad"/>
+ <module name="AnnotationLocation">
+ <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/>
+ </module>
+ <module name="AnnotationLocation">
+ <property name="tokens" value="VARIABLE_DEF"/>
+ <property name="allowSamelineMultipleAnnotations" value="true"/>
+ </module>
+ <module name="AtclauseOrder">
+ <property name="tagOrder" value="@param, @return, @throws, @deprecated"/>
+ <property name="target" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
+ </module>
+ <module name="JavadocMethod">
+ <property name="scope" value="public"/>
+ <property name="allowMissingJavadoc" value="true"/>
+ <property name="allowMissingParamTags" value="true"/>
+ <property name="allowMissingThrowsTags" value="true"/>
+ <property name="allowMissingReturnTag" value="true"/>
+ <property name="minLineCount" value="0"/>
+ <property name="allowedAnnotations" value="Override, Test"/>
+ <property name="allowThrowsTagsForSubclasses" value="true"/>
+ </module>
+ <module name="MethodName">
+ <property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9_]*$"/>
+ <message key="name.invalidPattern"
+ value="Method name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="SingleLineJavadoc">
+ <property name="ignoreInlineTags" value="false"/>
+ </module>
+ <module name="EmptyCatchBlock">
+ <property name="exceptionVariableName" value="expected"/>
+ </module>
+ <module name="CommentsIndentation">
+ <property name="tokens" value="BLOCK_COMMENT_BEGIN"/>
+ </module>
+ </module>
+</module>
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
index c976458..35356cd 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java
@@ -26,8 +26,8 @@ import java.util.Properties;
public class SecureRpcClientFactory {
/**
- * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with
- * the next hop.
+ * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating
+ * with the next hop.
* @param props
* @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the
* given parameters.
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
index f31582c..395bc1f 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java
@@ -22,7 +22,10 @@ import org.apache.flume.FlumeException;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.FlumeAuthenticator;
import org.apache.flume.auth.PrivilegedExecutor;
-import org.apache.thrift.transport.*;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.Sasl;
@@ -52,9 +55,9 @@ public class SecureThriftRpcClient extends ThriftRpcClient {
String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL);
String keytab = properties.getProperty(CLIENT_KEYTAB);
this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab);
- if(!privilegedExecutor.isAuthenticated()) {
+ if (!privilegedExecutor.isAuthenticated()) {
throw new FlumeException("Authentication failed in Kerberos mode for " +
- "principal " + clientPrincipal + " keytab " + keytab);
+ "principal " + clientPrincipal + " keytab " + keytab);
}
}
@@ -78,31 +81,33 @@ public class SecureThriftRpcClient extends ThriftRpcClient {
*/
public static class UgiSaslClientTransport extends TSaslClientTransport {
PrivilegedExecutor privilegedExecutor;
+
public UgiSaslClientTransport(String mechanism, String authorizationId,
String protocol, String serverName, Map<String, String> props,
- CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException {
- super(mechanism, authorizationId, protocol, serverName, props, cbh,
- transport);
+ CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor)
+ throws IOException {
+ super(mechanism, authorizationId, protocol, serverName, props, cbh, transport);
this.privilegedExecutor = privilegedExecutor;
}
- // open the SASL transport with using the current UserGroupInformation
- // This is needed to get the current login context stored
+ /**
+ * Open the SASL transport with using the current UserGroupInformation.
+ * This is needed to get the current login context stored
+ */
@Override
public void open() throws FlumeException {
try {
this.privilegedExecutor.execute(
- new PrivilegedExceptionAction<Void>() {
- public Void run() throws FlumeException {
- // this is a workaround to using UgiSaslClientTransport.super.open()
- // which results in IllegalAccessError
- callSuperClassOpen();
- return null;
- }
- });
+ new PrivilegedExceptionAction<Void>() {
+ public Void run() throws FlumeException {
+ // this is a workaround to using UgiSaslClientTransport.super.open()
+ // which results in IllegalAccessError
+ callSuperClassOpen();
+ return null;
+ }
+ });
} catch (InterruptedException e) {
- throw new FlumeException(
- "Interrupted while opening underlying transport", e);
+ throw new FlumeException("Interrupted while opening underlying transport", e);
} catch (Exception e) {
throw new FlumeException("Failed to open SASL transport", e);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
index 5627652..87cef31 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java
@@ -50,10 +50,10 @@ public class FlumeAuthenticationUtil {
*
* @throws org.apache.flume.auth.SecurityException
*/
- public synchronized static FlumeAuthenticator getAuthenticator(
+ public static synchronized FlumeAuthenticator getAuthenticator(
String principal, String keytab) throws SecurityException {
- if(principal == null && keytab == null) {
+ if (principal == null && keytab == null) {
return SimpleAuthenticator.getSimpleAuthenticator();
}
@@ -62,7 +62,7 @@ public class FlumeAuthenticationUtil {
Preconditions.checkArgument(keytab != null,
"Keytab can not be null when Principal is provided");
- if(kerbAuthenticator == null) {
+ if (kerbAuthenticator == null) {
kerbAuthenticator = new KerberosAuthenticator();
}
kerbAuthenticator.authenticate(principal, keytab);
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
index 4a0e0f4..45091f5 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java
@@ -18,6 +18,7 @@
package org.apache.flume.auth;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import com.google.common.base.Preconditions;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
/**
@@ -66,10 +66,10 @@ class KerberosAuthenticator implements FlumeAuthenticator {
@Override
public synchronized PrivilegedExecutor proxyAs(String proxyUserName) {
- if(proxyUserName == null || proxyUserName.isEmpty()) {
+ if (proxyUserName == null || proxyUserName.isEmpty()) {
return this;
}
- if(proxyCache.get(proxyUserName) == null) {
+ if (proxyCache.get(proxyUserName) == null) {
UserGroupInformation proxyUgi;
proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, ugi);
printUGI(proxyUgi);
@@ -131,13 +131,13 @@ class KerberosAuthenticator implements FlumeAuthenticator {
KerberosUser newUser = new KerberosUser(resolvedPrincipal, keytab);
Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
- "Cannot use multiple kerberos principals in the same agent. " +
- " Must restart agent to use new principal or keytab. " +
- "Previous = %s, New = %s", prevUser, newUser);
+ "Cannot use multiple kerberos principals in the same agent. " +
+ " Must restart agent to use new principal or keytab. " +
+ "Previous = %s, New = %s", prevUser, newUser);
// enable the kerberos mode of UGI, before doing anything else
- if(!UserGroupInformation.isSecurityEnabled()) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
Configuration conf = new Configuration(false);
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
@@ -147,7 +147,7 @@ class KerberosAuthenticator implements FlumeAuthenticator {
UserGroupInformation curUser = null;
try {
curUser = UserGroupInformation.getLoginUser();
- if(curUser != null && !curUser.hasKerberosCredentials()) {
+ if (curUser != null && !curUser.hasKerberosCredentials()) {
curUser = null;
}
} catch (IOException e) {
@@ -166,8 +166,8 @@ class KerberosAuthenticator implements FlumeAuthenticator {
if (curUser != null && curUser.getUserName().equals(ugi.getUserName())) {
LOG.debug("Using existing principal login: {}", ugi);
} else {
- LOG.info("Attempting kerberos Re-login as principal ({}) "
- , new Object[] { ugi.getUserName() } );
+ LOG.info("Attempting kerberos Re-login as principal ({}) ",
+ new Object[] { ugi.getUserName() } );
ugi.reloginFromKeytab();
}
} else {
@@ -192,9 +192,10 @@ class KerberosAuthenticator implements FlumeAuthenticator {
// dump login information
AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
LOG.info("\n{} \nUser: {} \nAuth method: {} \nKeytab: {} \n",
- new Object[]{ authMethod.equals(AuthenticationMethod.PROXY) ?
- "Proxy as: " : "Logged as: ", ugi.getUserName(), authMethod,
- ugi.isFromKeytab() }
+ new Object[] {
+ authMethod.equals(AuthenticationMethod.PROXY) ? "Proxy as: " : "Logged as: ",
+ ugi.getUserName(), authMethod, ugi.isFromKeytab()
+ }
);
}
}
@@ -224,7 +225,7 @@ class KerberosAuthenticator implements FlumeAuthenticator {
@VisibleForTesting
String getUserName() {
- if(ugi != null) {
+ if (ugi != null) {
return ugi.getUserName();
} else {
return null;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java
index dd37721..22852de 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java
@@ -46,7 +46,9 @@ public class KerberosUser {
return false;
}
final KerberosUser other = (KerberosUser) obj;
- if ((this.principal == null) ? (other.principal != null) : !this.principal.equals(other.principal)) {
+ if ((this.principal == null) ?
+ (other.principal != null) :
+ !this.principal.equals(other.principal)) {
return false;
}
if ((this.keyTab == null) ? (other.keyTab != null) : !this.keyTab.equals(other.keyTab)) {
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
index f7b5bea..d2791a1 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java
@@ -57,10 +57,10 @@ class SimpleAuthenticator implements FlumeAuthenticator {
@Override
public synchronized PrivilegedExecutor proxyAs(String proxyUserName) {
- if(proxyUserName == null || proxyUserName.isEmpty()) {
+ if (proxyUserName == null || proxyUserName.isEmpty()) {
return this;
}
- if(proxyCache.get(proxyUserName) == null) {
+ if (proxyCache.get(proxyUserName) == null) {
UserGroupInformation proxyUgi;
try {
proxyUgi = UserGroupInformation.createProxyUser(proxyUserName,
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
index cd62b91..a6ebd86 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
@@ -55,7 +55,7 @@ class UGIExecutor implements PrivilegedExecutor {
private void ensureValidAuth() {
reloginUGI(ugi);
- if(ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) {
+ if (ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) {
reloginUGI(ugi.getRealUser());
}
}
@@ -70,9 +70,9 @@ class UGIExecutor implements PrivilegedExecutor {
*/
private void reloginUGI(UserGroupInformation ugi) {
try {
- if(ugi.hasKerberosCredentials()) {
+ if (ugi.hasKerberosCredentials()) {
long now = System.currentTimeMillis();
- if(now - lastReloginAttempt < MIN_TIME_BEFORE_RELOGIN) {
+ if (now - lastReloginAttempt < MIN_TIME_BEFORE_RELOGIN) {
return;
}
lastReloginAttempt = now;
@@ -86,7 +86,7 @@ class UGIExecutor implements PrivilegedExecutor {
@VisibleForTesting
String getUserName() {
- if(ugi != null) {
+ if (ugi != null) {
return ugi.getUserName();
} else {
return null;
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
index 588506a..b75c29e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
@@ -24,12 +24,13 @@ import org.apache.flume.FlumeException;
* Exception thrown when the checkpoint directory contains invalid data,
* probably due to the channel stopping while the checkpoint was written.
*/
-public class BadCheckpointException extends FlumeException{
+public class BadCheckpointException extends FlumeException {
private static final long serialVersionUID = -5038652693746472779L;
public BadCheckpointException(String msg) {
super(msg);
}
+
public BadCheckpointException(String msg, Throwable t) {
super(msg, t);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index b961ae2..a0ecdeb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -23,12 +23,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -37,26 +31,28 @@ import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
public class CheckpointRebuilder {
private final List<File> logFiles;
private final FlumeEventQueue queue;
- private final Set<ComparableFlumeEventPointer> committedPuts =
- Sets.newHashSet();
- private final Set<ComparableFlumeEventPointer> pendingTakes =
- Sets.newHashSet();
+ private final Set<ComparableFlumeEventPointer> committedPuts = Sets.newHashSet();
+ private final Set<ComparableFlumeEventPointer> pendingTakes = Sets.newHashSet();
private final SetMultimap<Long, ComparableFlumeEventPointer> uncommittedPuts =
- HashMultimap.create();
+ HashMultimap.create();
private final SetMultimap<Long, ComparableFlumeEventPointer>
- uncommittedTakes = HashMultimap.create();
+ uncommittedTakes = HashMultimap.create();
private final boolean fsyncPerTransaction;
- private static Logger LOG =
- LoggerFactory.getLogger(CheckpointRebuilder.class);
+ private static Logger LOG = LoggerFactory.getLogger(CheckpointRebuilder.class);
- public CheckpointRebuilder(List<File> logFiles,
- FlumeEventQueue queue, boolean fsyncPerTransaction) throws
- IOException {
+ public CheckpointRebuilder(List<File> logFiles, FlumeEventQueue queue,
+ boolean fsyncPerTransaction) throws IOException {
this.logFiles = logFiles;
this.queue = queue;
this.fsyncPerTransaction = fsyncPerTransaction;
@@ -68,8 +64,8 @@ public class CheckpointRebuilder {
for (File logFile : logFiles) {
try {
logReaders.add(LogFileFactory.getSequentialReader(logFile, null,
- fsyncPerTransaction));
- } catch(EOFException e) {
+ fsyncPerTransaction));
+ } catch (EOFException e) {
LOG.warn("Ignoring " + logFile + " due to EOF", e);
}
}
@@ -84,27 +80,24 @@ public class CheckpointRebuilder {
TransactionEventRecord record = entry.getEvent();
long trans = record.getTransactionID();
long writeOrderID = record.getLogWriteOrderID();
- transactionIDSeed = Math.max(trans, transactionIDSeed);
- writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed);
+ transactionIDSeed = Math.max(trans, transactionIDSeed);
+ writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed);
if (record.getRecordType() == TransactionEventRecord.Type.PUT.get()) {
uncommittedPuts.put(record.getTransactionID(),
- new ComparableFlumeEventPointer(
+ new ComparableFlumeEventPointer(
new FlumeEventPointer(fileID, offset),
record.getLogWriteOrderID()));
- } else if (record.getRecordType()
- == TransactionEventRecord.Type.TAKE.get()) {
+ } else if (record.getRecordType() == TransactionEventRecord.Type.TAKE.get()) {
Take take = (Take) record;
uncommittedTakes.put(record.getTransactionID(),
- new ComparableFlumeEventPointer(
+ new ComparableFlumeEventPointer(
new FlumeEventPointer(take.getFileID(), take.getOffset()),
record.getLogWriteOrderID()));
- } else if (record.getRecordType()
- == TransactionEventRecord.Type.COMMIT.get()) {
+ } else if (record.getRecordType() == TransactionEventRecord.Type.COMMIT.get()) {
Commit commit = (Commit) record;
- if (commit.getType()
- == TransactionEventRecord.Type.PUT.get()) {
+ if (commit.getType() == TransactionEventRecord.Type.PUT.get()) {
Set<ComparableFlumeEventPointer> puts =
- uncommittedPuts.get(record.getTransactionID());
+ uncommittedPuts.get(record.getTransactionID());
if (puts != null) {
for (ComparableFlumeEventPointer put : puts) {
if (!pendingTakes.remove(put)) {
@@ -114,7 +107,7 @@ public class CheckpointRebuilder {
}
} else {
Set<ComparableFlumeEventPointer> takes =
- uncommittedTakes.get(record.getTransactionID());
+ uncommittedTakes.get(record.getTransactionID());
if (takes != null) {
for (ComparableFlumeEventPointer take : takes) {
if (!committedPuts.remove(take)) {
@@ -123,8 +116,7 @@ public class CheckpointRebuilder {
}
}
}
- } else if (record.getRecordType()
- == TransactionEventRecord.Type.ROLLBACK.get()) {
+ } else if (record.getRecordType() == TransactionEventRecord.Type.ROLLBACK.get()) {
if (uncommittedPuts.containsKey(record.getTransactionID())) {
uncommittedPuts.removeAll(record.getTransactionID());
} else {
@@ -134,18 +126,16 @@ public class CheckpointRebuilder {
}
}
} catch (Exception e) {
- LOG.warn("Error while generating checkpoint "
- + "using fast generation logic", e);
+ LOG.warn("Error while generating checkpoint using fast generation logic", e);
return false;
} finally {
- TransactionIDOracle.setSeed(transactionIDSeed);
- WriteOrderOracle.setSeed(writeOrderIDSeed);
+ TransactionIDOracle.setSeed(transactionIDSeed);
+ WriteOrderOracle.setSeed(writeOrderIDSeed);
for (LogFile.SequentialReader reader : logReaders) {
reader.close();
}
}
- Set<ComparableFlumeEventPointer> sortedPuts =
- Sets.newTreeSet(committedPuts);
+ Set<ComparableFlumeEventPointer> sortedPuts = Sets.newTreeSet(committedPuts);
int count = 0;
for (ComparableFlumeEventPointer put : sortedPuts) {
queue.addTail(put.pointer);
@@ -159,9 +149,9 @@ public class CheckpointRebuilder {
long checkpointLogOrderID = 0;
List<LogFile.MetaDataWriter> metaDataWriters = Lists.newArrayList();
for (File logFile : logFiles) {
- String name = logFile.getName();
- metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile,
- Integer.parseInt(name.substring(name.lastIndexOf('-') + 1))));
+ String name = logFile.getName();
+ metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile,
+ Integer.parseInt(name.substring(name.lastIndexOf('-') + 1))));
}
try {
if (queue.checkpoint(true)) {
@@ -171,8 +161,7 @@ public class CheckpointRebuilder {
}
}
} catch (Exception e) {
- LOG.warn("Error while generating checkpoint "
- + "using fast generation logic", e);
+ LOG.warn("Error while generating checkpoint using fast generation logic", e);
} finally {
for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) {
metaDataWriter.close();
@@ -181,14 +170,14 @@ public class CheckpointRebuilder {
}
private final class ComparableFlumeEventPointer
- implements Comparable<ComparableFlumeEventPointer> {
+ implements Comparable<ComparableFlumeEventPointer> {
private final FlumeEventPointer pointer;
private final long orderID;
- public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){
+ public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID) {
Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be"
- + "null while creating a ComparableFlumeEventPointer");
+ + "null while creating a ComparableFlumeEventPointer");
this.pointer = pointer;
this.orderID = orderID;
}
@@ -204,22 +193,22 @@ public class CheckpointRebuilder {
}
@Override
- public int hashCode(){
+ public int hashCode() {
return pointer.hashCode();
}
@Override
- public boolean equals(Object o){
- if(this == o){
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
- if(o == null){
+ if (o == null) {
return false;
}
- if(o.getClass() != this.getClass()){
+ if (o.getClass() != this.getClass()) {
return false;
}
- return pointer.equals(((ComparableFlumeEventPointer)o).pointer);
+ return pointer.equals(((ComparableFlumeEventPointer) o).pointer);
}
}
@@ -245,20 +234,19 @@ public class CheckpointRebuilder {
}
int capacity = Integer.parseInt(cli.getOptionValue("t"));
File checkpointFile = new File(checkpointDir, "checkpoint");
- if(checkpointFile.exists()) {
+ if (checkpointFile.exists()) {
LOG.error("Cannot execute fast replay",
- new IllegalStateException("Checkpoint exists" + checkpointFile));
+ new IllegalStateException("Checkpoint exists" + checkpointFile));
} else {
EventQueueBackingStore backingStore =
EventQueueBackingStoreFactory.get(checkpointFile,
capacity, "channel");
FlumeEventQueue queue = new FlumeEventQueue(backingStore,
- new File(checkpointDir, "inflighttakes"),
- new File(checkpointDir, "inflightputs"),
- new File(checkpointDir, Log.QUEUE_SET));
- CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles,
- queue, true);
- if(rebuilder.rebuild()) {
+ new File(checkpointDir, "inflighttakes"),
+ new File(checkpointDir, "inflightputs"),
+ new File(checkpointDir, Log.QUEUE_SET));
+ CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue, true);
+ if (rebuilder.rebuild()) {
rebuilder.writeCheckpoint();
} else {
LOG.error("Could not rebuild the checkpoint due to errors.");
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
index 3663244..8fd53cc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
@@ -36,44 +36,52 @@ class Commit extends TransactionEventRecord {
* Type of Commit Take|Put
*/
private short type;
+
Commit(Long transactionID, Long logWriteOrderID) {
super(transactionID, logWriteOrderID);
}
+
Commit(Long transactionID, Long logWriteOrderID, short type) {
this(transactionID, logWriteOrderID);
this.type = type;
}
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
type = in.readShort();
}
+
@Override
void writeProtos(OutputStream out) throws IOException {
- ProtosFactory.Commit.Builder commitBuilder =
- ProtosFactory.Commit.newBuilder();
+ ProtosFactory.Commit.Builder commitBuilder = ProtosFactory.Commit.newBuilder();
commitBuilder.setType(type);
commitBuilder.build().writeDelimitedTo(out);
}
+
@Override
void readProtos(InputStream in) throws IOException {
- ProtosFactory.Commit commit = Preconditions.checkNotNull(ProtosFactory.
- Commit.parseDelimitedFrom(in), "Commit cannot be null");
+ ProtosFactory.Commit commit =
+ Preconditions.checkNotNull(ProtosFactory.Commit.parseDelimitedFrom(in),
+ "Commit cannot be null");
type = (short) commit.getType();
}
short getType() {
return type;
}
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeShort(type);
}
+
@Override
short getRecordType() {
return Type.COMMIT.get();
}
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
index 691d291..5438f2e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java
@@ -18,10 +18,9 @@
*/
package org.apache.flume.channel.file;
-
public class CorruptEventException extends Exception {
-
private static final long serialVersionUID = -2986946303540798416L;
+
public CorruptEventException() {
super();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index 456df34..dcd6f98 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -18,92 +18,91 @@
*/
package org.apache.flume.channel.file;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
+import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
class EventQueueBackingStoreFactory {
- private static final Logger LOG = LoggerFactory
- .getLogger(EventQueueBackingStoreFactory.class);
+ private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFactory.class);
+
private EventQueueBackingStoreFactory() {}
+
static EventQueueBackingStore get(File checkpointFile, int capacity,
- String name) throws Exception {
+ String name) throws Exception {
return get(checkpointFile, capacity, name, true);
}
static EventQueueBackingStore get(File checkpointFile, int capacity,
- String name, boolean upgrade) throws Exception {
+ String name, boolean upgrade) throws Exception {
return get(checkpointFile, null, capacity, name, upgrade, false, false);
}
- static EventQueueBackingStore get(File checkpointFile,
- File backupCheckpointDir, int capacity,String name,
- boolean upgrade, boolean shouldBackup, boolean compressBackup)
- throws Exception {
+
+ static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir,
+ int capacity, String name, boolean upgrade,
+ boolean shouldBackup, boolean compressBackup) throws Exception {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
RandomAccessFile checkpointFileHandle = null;
try {
boolean checkpointExists = checkpointFile.exists();
boolean metaDataExists = metaDataFile.exists();
- if(metaDataExists) {
+ if (metaDataExists) {
// if we have a metadata file but no checkpoint file, we have a problem
// delete everything in the checkpoint directory and force
// a full replay.
- if(!checkpointExists || checkpointFile.length() == 0) {
+ if (!checkpointExists || checkpointFile.length() == 0) {
LOG.warn("MetaData file for checkpoint "
- + " exists but checkpoint does not. Checkpoint = " + checkpointFile
- + ", metaDataFile = " + metaDataFile);
+ + " exists but checkpoint does not. Checkpoint = " + checkpointFile
+ + ", metaDataFile = " + metaDataFile);
throw new BadCheckpointException(
- "The last checkpoint was not completed correctly, " +
- "since Checkpoint file does not exist while metadata " +
- "file does.");
+ "The last checkpoint was not completed correctly, " +
+ "since Checkpoint file does not exist while metadata " +
+ "file does.");
}
}
// brand new, use v3
- if(!checkpointExists) {
- if(!checkpointFile.createNewFile()) {
+ if (!checkpointExists) {
+ if (!checkpointFile.createNewFile()) {
throw new IOException("Cannot create " + checkpointFile);
}
return new EventQueueBackingStoreFileV3(checkpointFile,
capacity, name, backupCheckpointDir, shouldBackup, compressBackup);
}
// v3 due to meta file, version will be checked by backing store
- if(metaDataExists) {
+ if (metaDataExists) {
return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
- name, backupCheckpointDir, shouldBackup, compressBackup);
+ name, backupCheckpointDir, shouldBackup, compressBackup);
}
checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
- int version = (int)checkpointFileHandle.readLong();
- if(Serialization.VERSION_2 == version) {
- if(upgrade) {
+ int version = (int) checkpointFileHandle.readLong();
+ if (Serialization.VERSION_2 == version) {
+ if (upgrade) {
return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
- shouldBackup, compressBackup);
+ shouldBackup, compressBackup);
}
return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
}
LOG.error("Found version " + Integer.toHexString(version) + " in " +
checkpointFile);
throw new BadCheckpointException("Checkpoint file exists with " +
- Serialization.VERSION_3 + " but no metadata file found.");
+ Serialization.VERSION_3 + " but no metadata file found.");
} finally {
- if(checkpointFileHandle != null) {
+ if (checkpointFileHandle != null) {
try {
checkpointFileHandle.close();
- } catch(IOException e) {
+ } catch (IOException e) {
LOG.warn("Unable to close " + checkpointFile, e);
}
}
}
}
- private static EventQueueBackingStore upgrade(File checkpointFile,
- int capacity, String name, File backupCheckpointDir,
- boolean shouldBackup, boolean compressBackup)
- throws Exception {
+ private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, String name,
+ File backupCheckpointDir, boolean shouldBackup,
+ boolean compressBackup) throws Exception {
LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
EventQueueBackingStoreFileV2 backingStoreV2 =
new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
@@ -115,7 +114,7 @@ class EventQueueBackingStoreFactory {
EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
metaDataFile);
return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
- backupCheckpointDir, shouldBackup, compressBackup);
+ backupCheckpointDir, shouldBackup, compressBackup);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 2b0987b..73f1d4c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -18,6 +18,15 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -34,21 +43,9 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
-
-
abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
- private static final Logger LOG = LoggerFactory
- .getLogger(EventQueueBackingStoreFile.class);
- private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB
+ private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFile.class);
+ private static final int MAX_ALLOC_BUFFER_SIZE = 2 * 1024 * 1024; // 2MB
protected static final int HEADER_SIZE = 1029;
protected static final int INDEX_VERSION = 0;
protected static final int INDEX_WRITE_ORDER_ID = 1;
@@ -71,15 +68,15 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
private final ExecutorService checkpointBackUpExecutor;
protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile) throws IOException,
+ File checkpointFile) throws IOException,
BadCheckpointException {
this(capacity, name, checkpointFile, null, false, false);
}
protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile, File checkpointBackupDir,
- boolean backupCheckpoint, boolean compressBackup)
- throws IOException, BadCheckpointException {
+ File checkpointFile, File checkpointBackupDir,
+ boolean backupCheckpoint, boolean compressBackup)
+ throws IOException, BadCheckpointException {
super(capacity, name);
this.checkpointFile = checkpointFile;
this.shouldBackup = backupCheckpoint;
@@ -87,7 +84,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
this.backupDir = checkpointBackupDir;
checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
- if(checkpointFileHandle.length() == 0) {
+ if (checkpointFileHandle.length() == 0) {
allocate(checkpointFile, totalBytes);
checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG);
checkpointFileHandle.writeLong(getVersion());
@@ -95,7 +92,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
LOG.info("Preallocated " + checkpointFile + " to " + checkpointFileHandle.length()
+ " for capacity " + capacity);
}
- if(checkpointFile.length() != totalBytes) {
+ if (checkpointFile.length() != totalBytes) {
String msg = "Configured capacity is " + capacity + " but the "
+ " checkpoint file capacity is " +
((checkpointFile.length() / Serialization.SIZE_OF_LONG) - HEADER_SIZE)
@@ -108,20 +105,20 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
elementsBuffer = mappedBuffer.asLongBuffer();
long version = elementsBuffer.get(INDEX_VERSION);
- if(version != (long) getVersion()) {
+ if (version != (long) getVersion()) {
throw new BadCheckpointException("Invalid version: " + version + " " +
- name + ", expected " + getVersion());
+ name + ", expected " + getVersion());
}
long checkpointComplete = elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
- if(checkpointComplete != (long) CHECKPOINT_COMPLETE) {
+ if (checkpointComplete != (long) CHECKPOINT_COMPLETE) {
throw new BadCheckpointException("Checkpoint was not completed correctly,"
- + " probably because the agent stopped while the channel was"
- + " checkpointing.");
+ + " probably because the agent stopped while the channel was"
+ + " checkpointing.");
}
if (shouldBackup) {
checkpointBackUpExecutor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat(
- getName() + " - CheckpointBackUpThread").build());
+ new ThreadFactoryBuilder().setNameFormat(
+ getName() + " - CheckpointBackUpThread").build());
} else {
checkpointBackUpExecutor = null;
}
@@ -142,13 +139,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
* @param backupDirectory - the directory to which the backup files should be
* copied.
* @throws IOException - if the copy failed, or if there is not enough disk
- * space to copy the checkpoint files over.
+ * space to copy the checkpoint files over.
*/
protected void backupCheckpoint(File backupDirectory) throws IOException {
int availablePermits = backupCompletedSema.drainPermits();
Preconditions.checkState(availablePermits == 0,
- "Expected no permits to be available in the backup semaphore, " +
- "but " + availablePermits + " permits were available.");
+ "Expected no permits to be available in the backup semaphore, " +
+ "but " + availablePermits + " permits were available.");
if (slowdownBackup) {
try {
TimeUnit.SECONDS.sleep(10);
@@ -160,45 +157,45 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
if (backupExists(backupDirectory)) {
if (!backupFile.delete()) {
throw new IOException("Error while doing backup of checkpoint. Could " +
- "not remove" + backupFile.toString() + ".");
+ "not remove" + backupFile.toString() + ".");
}
}
Serialization.deleteAllFiles(backupDirectory, Log.EXCLUDES);
File checkpointDir = checkpointFile.getParentFile();
File[] checkpointFiles = checkpointDir.listFiles();
Preconditions.checkNotNull(checkpointFiles, "Could not retrieve files " +
- "from the checkpoint directory. Cannot complete backup of the " +
- "checkpoint.");
+ "from the checkpoint directory. Cannot complete backup of the " +
+ "checkpoint.");
for (File origFile : checkpointFiles) {
- if(Log.EXCLUDES.contains(origFile.getName())) {
+ if (Log.EXCLUDES.contains(origFile.getName())) {
continue;
}
if (compressBackup && origFile.equals(checkpointFile)) {
Serialization.compressFile(origFile, new File(backupDirectory,
- origFile.getName() + COMPRESSED_FILE_EXTENSION));
+ origFile.getName() + COMPRESSED_FILE_EXTENSION));
} else {
Serialization.copyFile(origFile, new File(backupDirectory,
- origFile.getName()));
+ origFile.getName()));
}
}
Preconditions.checkState(!backupFile.exists(), "The backup file exists " +
- "while it is not supposed to. Are multiple channels configured to use " +
- "this directory: " + backupDirectory.toString() + " as backup?");
+ "while it is not supposed to. Are multiple channels configured to use " +
+ "this directory: " + backupDirectory.toString() + " as backup?");
if (!backupFile.createNewFile()) {
LOG.error("Could not create backup file. Backup of checkpoint will " +
- "not be used during replay even if checkpoint is bad.");
+ "not be used during replay even if checkpoint is bad.");
}
}
/**
* Restore the checkpoint, if it is found to be bad.
+ *
* @return true - if the previous backup was successfully completed and
* restore was successfully completed.
* @throws IOException - If restore failed due to IOException
- *
*/
public static boolean restoreBackup(File checkpointDir, File backupDir)
- throws IOException {
+ throws IOException {
if (!backupExists(backupDir)) {
return false;
}
@@ -210,14 +207,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
for (File backupFile : backupFiles) {
String fileName = backupFile.getName();
if (!fileName.equals(BACKUP_COMPLETE_FILENAME) &&
- !fileName.equals(Log.FILE_LOCK)) {
- if (fileName.endsWith(COMPRESSED_FILE_EXTENSION)){
+ !fileName.equals(Log.FILE_LOCK)) {
+ if (fileName.endsWith(COMPRESSED_FILE_EXTENSION)) {
Serialization.decompressFile(
- backupFile, new File(checkpointDir,
- fileName.substring(0, fileName.lastIndexOf("."))));
+ backupFile, new File(checkpointDir,
+ fileName.substring(0, fileName.lastIndexOf("."))));
} else {
Serialization.copyFile(backupFile, new File(checkpointDir,
- fileName));
+ fileName));
}
}
}
@@ -233,14 +230,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
if (shouldBackup) {
int permits = backupCompletedSema.drainPermits();
Preconditions.checkState(permits <= 1, "Expected only one or less " +
- "permits to checkpoint, but got " + String.valueOf(permits) +
- " permits");
- if(permits < 1) {
+ "permits to checkpoint, but got " + String.valueOf(permits) +
+ " permits");
+ if (permits < 1) {
// Force the checkpoint to not happen by throwing an exception.
throw new IOException("Previous backup of checkpoint files is still " +
- "in progress. Will attempt to checkpoint only at the end of the " +
- "next checkpoint interval. Try increasing the checkpoint interval " +
- "if this error happens often.");
+ "in progress. Will attempt to checkpoint only at the end of the " +
+ "next checkpoint interval. Try increasing the checkpoint interval " +
+ "if this error happens often.");
}
}
// Start checkpoint
@@ -249,12 +246,12 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
}
@Override
- void checkpoint() throws IOException {
+ void checkpoint() throws IOException {
setLogWriteOrderID(WriteOrderOracle.next());
LOG.info("Updating checkpoint metadata: logWriteOrderID: "
+ getLogWriteOrderID() + ", queueSize: " + getSize() + ", queueHead: "
- + getHead());
+ + getHead());
elementsBuffer.put(INDEX_WRITE_ORDER_ID, getLogWriteOrderID());
try {
writeCheckpointMetaData();
@@ -286,8 +283,8 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
*/
private void startBackupThread() {
Preconditions.checkNotNull(checkpointBackUpExecutor,
- "Expected the checkpoint backup exector to be non-null, " +
- "but it is null. Checkpoint will not be backed up.");
+ "Expected the checkpoint backup exector to be non-null, " +
+ "but it is null. Checkpoint will not be backed up.");
LOG.info("Attempting to back up checkpoint.");
checkpointBackUpExecutor.submit(new Runnable() {
@@ -317,16 +314,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
} catch (IOException e) {
LOG.info("Error closing " + checkpointFile, e);
}
- if(checkpointBackUpExecutor != null && !checkpointBackUpExecutor
- .isShutdown()) {
+ if (checkpointBackUpExecutor != null && !checkpointBackUpExecutor.isShutdown()) {
checkpointBackUpExecutor.shutdown();
try {
// Wait till the executor dies.
- while (!checkpointBackUpExecutor.awaitTermination(1,
- TimeUnit.SECONDS));
+ while (!checkpointBackUpExecutor.awaitTermination(1, TimeUnit.SECONDS)) {}
} catch (InterruptedException ex) {
LOG.warn("Interrupted while waiting for checkpoint backup to " +
- "complete");
+ "complete");
}
}
}
@@ -362,18 +357,19 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
@Override
protected void incrementFileID(int fileID) {
AtomicInteger counter = logFileIDReferenceCounts.get(fileID);
- if(counter == null) {
+ if (counter == null) {
counter = new AtomicInteger(0);
logFileIDReferenceCounts.put(fileID, counter);
}
counter.incrementAndGet();
}
+
@Override
protected void decrementFileID(int fileID) {
AtomicInteger counter = logFileIDReferenceCounts.get(fileID);
Preconditions.checkState(counter != null, "null counter ");
int count = counter.decrementAndGet();
- if(count == 0) {
+ if (count == 0) {
logFileIDReferenceCounts.remove(fileID);
}
}
@@ -391,7 +387,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
* totalBytes <= MAX_ALLOC_BUFFER_SIZE, so this can be cast to int
* without a problem.
*/
- checkpointFile.write(new byte[(int)totalBytes]);
+ checkpointFile.write(new byte[(int) totalBytes]);
} else {
byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE];
long remainingBytes = totalBytes;
@@ -404,7 +400,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
* so casting to int is fine.
*/
if (remainingBytes > 0) {
- checkpointFile.write(initBuffer, 0, (int)remainingBytes);
+ checkpointFile.write(initBuffer, 0, (int) remainingBytes);
}
}
success = true;
@@ -412,7 +408,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
try {
checkpointFile.close();
} catch (IOException e) {
- if(success) {
+ if (success) {
throw e;
}
}
@@ -436,9 +432,9 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
}
int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile)
- EventQueueBackingStoreFactory.get(file,capacity, "debug", false);
+ EventQueueBackingStoreFactory.get(file, capacity, "debug", false);
System.out.println("File Reference Counts"
- + backingStore.logFileIDReferenceCounts);
+ + backingStore.logFileIDReferenceCounts);
System.out.println("Queue Capacity " + backingStore.getCapacity());
System.out.println("Queue Size " + backingStore.getSize());
System.out.println("Queue Head " + backingStore.getHead());
@@ -447,7 +443,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
int fileID = (int) (value >>> 32);
int offset = (int) value;
System.out.println(index + ":" + Long.toHexString(value) + " fileID = "
- + fileID + ", offset = " + offset);
+ + fileID + ", offset = " + offset);
}
FlumeEventQueue queue =
new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile,
@@ -462,7 +458,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
int fileID = (int) (value >>> 32);
int offset = (int) value;
System.out.println(Long.toHexString(value) + " fileID = "
- + fileID + ", offset = " + offset);
+ + fileID + ", offset = " + offset);
}
}
SetMultimap<Long, Long> takeMap = queue.deserializeInflightTakes();
@@ -474,7 +470,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
int fileID = (int) (value >>> 32);
int offset = (int) value;
System.out.println(Long.toHexString(value) + " fileID = "
- + fileID + ", offset = " + offset);
+ + fileID + ", offset = " + offset);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
index abd2ea3..71183aa 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
@@ -28,7 +28,6 @@ import com.google.common.base.Preconditions;
final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
-
private static final int INDEX_SIZE = 2;
private static final int INDEX_HEAD = 3;
private static final int INDEX_ACTIVE_LOG = 5;
@@ -55,6 +54,7 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
}
}
}
+
@Override
protected int getVersion() {
return Serialization.VERSION_2;