You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/05/14 14:05:28 UTC
[2/2] git commit: FALCON-10 Add findbugs plugin and fix findbugs
warnings for project. Contributed by Venkatesh Seetharam
FALCON-10 Add findbugs plugin and fix findbugs warnings for project. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/bfad459e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/bfad459e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/bfad459e
Branch: refs/heads/master
Commit: bfad459e175229a51bfd7946c8593f1b61967872
Parents: f15ef92
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Tue May 14 17:34:46 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Tue May 14 17:34:46 2013 +0530
----------------------------------------------------------------------
.gitignore | 1 +
CHANGES.txt | 16 +++
.../src/main/resources/falcon/findbugs-exclude.xml | 34 ++++++
client/pom.xml | 10 ++
.../main/java/org/apache/falcon/cli/CLIParser.java | 14 +-
.../main/java/org/apache/falcon/cli/FalconCLI.java | 16 ++-
.../org/apache/falcon/client/FalconClient.java | 51 +++++----
.../org/apache/falcon/entity/v0/EntityType.java | 1 +
.../java/org/apache/falcon/resource/APIResult.java | 1 +
.../org/apache/falcon/resource/EntityList.java | 3 +-
.../apache/falcon/resource/InstancesResult.java | 1 +
common/pom.xml | 5 +
.../java/org/apache/falcon/entity/EntityUtil.java | 7 +-
.../apache/falcon/entity/common/Configuration.java | 76 -------------
.../apache/falcon/entity/parser/EntityParser.java | 13 ++-
.../apache/falcon/expression/ExpressionHelper.java | 1 +
.../java/org/apache/falcon/group/FeedGroup.java | 9 +-
.../security/FalconSecurityConfiguration.java | 2 +-
.../apache/falcon/service/LogCleanupService.java | 2 +-
.../apache/falcon/util/ApplicationProperties.java | 16 ++-
.../org/apache/falcon/util/DeploymentUtil.java | 2 +-
.../org/apache/falcon/util/RuntimeProperties.java | 2 +-
.../apache/falcon/converter/OozieFeedMapper.java | 4 +-
messaging/pom.xml | 5 +
.../falcon/messaging/EntityInstanceMessage.java | 5 +-
.../converter/AbstractOozieEntityMapper.java | 31 +++---
.../java/org/apache/falcon/logging/LogMover.java | 10 +-
.../falcon/workflow/engine/OozieClientFactory.java | 4 +-
.../workflow/engine/OozieWorkflowEngine.java | 88 ++++++---------
.../org/apache/oozie/client/CustomOozieClient.java | 6 +-
pom.xml | 38 ++++++-
.../falcon/resource/AbstractEntityManager.java | 34 +++---
.../falcon/resource/admin/AdminResource.java | 2 +
.../falcon/resource/channel/AbstractChannel.java | 2 +-
.../apache/falcon/resource/channel/MethodKey.java | 2 +-
.../resource/proxy/InstanceManagerProxy.java | 11 +-
.../falcon/service/FalconTopicSubscriber.java | 26 ++---
.../apache/falcon/replication/FeedReplicator.java | 5 +-
.../apache/falcon/latedata/LateDataHandler.java | 7 +-
.../org/apache/falcon/rerun/event/RerunEvent.java | 3 +-
.../falcon/rerun/handler/LateRerunHandler.java | 7 +-
.../apache/falcon/rerun/handler/RetryHandler.java | 20 +--
.../apache/falcon/rerun/queue/InMemoryQueue.java | 51 +++++----
.../org/apache/falcon/retention/FeedEvictor.java | 1 -
webapp/src/main/java/org/apache/falcon/Debug.java | 3 +-
45 files changed, 334 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 6c05272..2525236 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,4 @@ target
#ActiveMQ
activemq-data
+build
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
new file mode 100644
index 0000000..7bf01d5
--- /dev/null
+++ b/CHANGES.txt
@@ -0,0 +1,16 @@
+Apache Falcon (incubating) Change log
+
+Trunk (Unreleased)
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ FALCON-10 Add findbugs plugin and fix findbugs warnings for project (Venkatesh
+ Seetharam via Srikanth Sundarrajan)
+
+ OPTIMIZATIONS
+
+ BUG FIXES
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/build-tools/src/main/resources/falcon/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/build-tools/src/main/resources/falcon/findbugs-exclude.xml b/build-tools/src/main/resources/falcon/findbugs-exclude.xml
new file mode 100644
index 0000000..0a7580d
--- /dev/null
+++ b/build-tools/src/main/resources/falcon/findbugs-exclude.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<FindBugsFilter>
+ <!-- These are generated by xjc compiler and hence excluded. -->
+ <Match>
+ <Or>
+ <Class name="~org.apache.falcon.entity.v0.feed.Validity" />
+ <Class name="~org.apache.falcon.entity.v0.process.Validity" />
+ </Or>
+ </Match>
+
+ <!--
+ Disable encoding as this might give an impression that Falcon code base is
+ "Internationalization" ready, but we haven't done anything consciously to guarantee that.
+ -->
+ <Match>
+ <Bug pattern="DM_DEFAULT_ENCODING" />
+ </Match>
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index d19516a..f602533 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -64,6 +64,16 @@
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>net.sourceforge.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/cli/CLIParser.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/CLIParser.java b/client/src/main/java/org/apache/falcon/cli/CLIParser.java
index 18fc487..18c26e5 100644
--- a/client/src/main/java/org/apache/falcon/cli/CLIParser.java
+++ b/client/src/main/java/org/apache/falcon/cli/CLIParser.java
@@ -42,12 +42,12 @@ public class CLIParser {
/**
* Create a parser.
*
- * @param cliName name of the parser, for help purposes.
- * @param cliHelp help for the CLI.
+ * @param aCliName name of the parser, for help purposes.
+ * @param aCliHelp help for the CLI.
*/
- public CLIParser(String cliName, String[] cliHelp) {
- this.cliName = cliName;
- this.cliHelp = cliHelp;
+ public CLIParser(String aCliName, String[] aCliHelp) {
+ this.cliName = aCliName;
+ this.cliHelp = aCliHelp.clone();
}
/**
@@ -57,7 +57,7 @@ public class CLIParser {
* @param argsHelp command arguments help.
* @param commandHelp command description.
* @param commandOptions command options.
- * @param hasArguments
+ * @param hasArguments has args
*/
public void addCommand(String command, String argsHelp, String commandHelp, Options commandOptions,
boolean hasArguments) {
@@ -70,7 +70,7 @@ public class CLIParser {
/**
* Bean that represents a parsed command.
*/
- public final class Command {
+ public static final class Command {
private String name;
private CommandLine commandLine;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 7c60328..086bf4a 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
@@ -583,11 +584,16 @@ public class FalconCLI {
}
private Properties getClientProperties() throws IOException {
- Properties prop = new Properties();
- InputStream input = FalconCLI.class.getResourceAsStream(CLIENT_PROPERTIES);
- if (input != null) {
- prop.load(input);
+ InputStream inputStream = null;
+ try {
+ inputStream = FalconCLI.class.getResourceAsStream(CLIENT_PROPERTIES);
+ Properties prop = new Properties();
+ if (inputStream != null) {
+ prop.load(inputStream);
+ }
+ return prop;
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
- return prop;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index cf92b84..24a5bfe 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -22,6 +22,7 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
+import org.apache.commons.io.IOUtils;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.EntityList;
@@ -47,7 +48,7 @@ import java.util.Properties;
*/
public class FalconClient {
- protected static WebResource service;
+ private final WebResource service;
public static final String WS_HEADER_PREFIX = "header:";
private static final String REMOTE_USER = "Remote-User";
private static final String USER = System.getProperty("user.name");
@@ -67,8 +68,7 @@ public class FalconClient {
}
Client client = Client.create(new DefaultClientConfig());
setFalconTimeOut(client);
- FalconClient.service = client.resource(UriBuilder.fromUri(baseUrl)
- .build());
+ service = client.resource(UriBuilder.fromUri(baseUrl).build());
client.resource(UriBuilder.fromUri(baseUrl).build());
// addHeaders();
@@ -76,20 +76,24 @@ public class FalconClient {
private void setFalconTimeOut(Client client) throws IOException {
Properties prop = new Properties();
- InputStream input = FalconClient.class
- .getResourceAsStream("/client.properties");
int readTimeout;
int connectTimeout;
- if (input != null) {
- prop.load(input);
- readTimeout = prop.containsKey("falcon.read.timeout") ? Integer
- .parseInt(prop.getProperty("falcon.read.timeout")) : 180000;
- connectTimeout = prop.containsKey("falcon.connect.timeout") ? Integer
- .parseInt(prop.getProperty("falcon.connect.timeout"))
- : 180000;
- } else {
- readTimeout = 180000;
- connectTimeout = 180000;
+ InputStream inputStream = null;
+ try {
+ inputStream = FalconClient.class.getResourceAsStream("/client.properties");
+ if (inputStream != null) {
+ prop.load(inputStream);
+ readTimeout = prop.containsKey("falcon.read.timeout") ? Integer
+ .parseInt(prop.getProperty("falcon.read.timeout")) : 180000;
+ connectTimeout = prop.containsKey("falcon.connect.timeout") ? Integer
+ .parseInt(prop.getProperty("falcon.connect.timeout"))
+ : 180000;
+ } else {
+ readTimeout = 180000;
+ connectTimeout = 180000;
+ }
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
client.setConnectTimeout(connectTimeout);
client.setReadTimeout(readTimeout);
@@ -316,12 +320,17 @@ public class FalconClient {
StringBuilder buffer = new StringBuilder();
if (filePath != null) {
- BufferedReader in = new BufferedReader(new FileReader(filePath));
- String str;
- while ((str = in.readLine()) != null) {
- buffer.append(str).append("\n");
+ BufferedReader in = null;
+ try {
+ in = new BufferedReader(new FileReader(filePath));
+
+ String str;
+ while ((str = in.readLine()) != null) {
+ buffer.append(str).append("\n");
+ }
+ } finally {
+ IOUtils.closeQuietly(in);
}
- in.close();
}
String temp = (buffer.length() == 0) ? null : buffer.toString();
return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
@@ -390,7 +399,7 @@ public class FalconClient {
if (properties != null) {
buffer.append(properties);
}
- stream = new ByteArrayInputStream(buffer.toString().getBytes("UTF-8"));
+ stream = new ByteArrayInputStream(buffer.toString().getBytes());
return (buffer.length() == 0) ? null : stream;
}
// private ServletInputStream getServletInputStream(final InputStream
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
index 758308e..bc7b1f2 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
@@ -91,6 +91,7 @@ public enum EntityType {
return this != EntityType.CLUSTER;
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP"})
public String[] getImmutableProperties() {
return immutableProperties;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/resource/APIResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/APIResult.java b/client/src/main/java/org/apache/falcon/resource/APIResult.java
index 2f1420d..79b8a1d 100644
--- a/client/src/main/java/org/apache/falcon/resource/APIResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/APIResult.java
@@ -35,6 +35,7 @@ import java.util.UUID;
*/
@XmlRootElement(name = "result")
@XmlAccessorType(XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class APIResult {
private Status status;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index 0344f2e..64dbae0 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -30,6 +30,7 @@ import javax.xml.bind.annotation.XmlRootElement;
*/
@XmlRootElement(name = "entities")
@XmlAccessorType(XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class EntityList {
@XmlElement(name = "entity")
@@ -80,7 +81,7 @@ public class EntityList {
@Override
public String toString() {
- StringBuffer buffer = new StringBuffer();
+ StringBuilder buffer = new StringBuilder();
for (EntityElement element : elements) {
buffer.append(element);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index 1822273..da0ccc5 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -27,6 +27,7 @@ import java.util.Date;
*/
//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
@XmlRootElement
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class InstancesResult extends APIResult {
/**
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index af74889..3f5a247 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -97,6 +97,11 @@
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-test-util</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>net.sourceforge.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index ba086f8..4897985 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -76,7 +76,7 @@ public final class EntityUtil {
public static TimeZone getTimeZone(String tzId) {
if (tzId == null) {
- throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
+ throw new IllegalArgumentException("Invalid TimeZone: Cannot be null.");
}
TimeZone tz = TimeZone.getTimeZone(tzId);
if (!tzId.equals("GMT") && tz.getID().equals("GMT")) {
@@ -379,9 +379,10 @@ public final class EntityUtil {
} catch (NoSuchMethodException e) {
try {
Map map = PropertyUtils.describe(obj);
- for (Object key : map.keySet()) {
+ for (Object entry : map.entrySet()) {
+ String key = (String)((Map.Entry)entry).getKey();
if (!key.equals("class")) {
- mapToProperties(map.get(key), name != null ? name + "." + key : (String) key, propMap,
+ mapToProperties(map.get(key), name != null ? name + "." + key : key, propMap,
filterProps);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/Configuration.java b/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
deleted file mode 100644
index 2e0e426..0000000
--- a/common/src/main/java/org/apache/falcon/entity/common/Configuration.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.common;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Property k/v.
- */
-public class Configuration implements Iterable<Map.Entry<String, String>>, Cloneable {
-
- private final Map<String, String> properties;
-
- public Configuration() {
- properties = new ConcurrentHashMap<String, String>();
- }
-
- public Configuration(Map<String, String> properties) {
- this.properties = properties;
- }
-
- public void addConfiguration(Configuration config) {
- for (Entry<String, String> entry : config) {
- properties.put(entry.getKey(), entry.getValue());
- }
- }
-
- public Configuration addAndReturnNewConfiguration(Configuration config) {
- Map<String, String> newProperties = new ConcurrentHashMap<String, String>(properties);
- for (Entry<String, String> entry : config) {
- newProperties.put(entry.getKey(), entry.getValue());
- }
- return new Configuration(newProperties);
- }
-
- public String getConf(String name) {
- return properties.get(name);
- }
-
- public void setConf(String name, String value) {
- properties.put(name, value);
- }
-
- public void setConf(String name, String value, String defaultValue) {
- if (value == null) {
- properties.put(name, defaultValue);
- } else {
- properties.put(name, value);
- }
- }
-
- @Override
- public Iterator<Entry<String, String>> iterator() {
- return properties.entrySet().iterator();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
index d24fafc..0df831d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
@@ -18,6 +18,7 @@
package org.apache.falcon.entity.parser;
+import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -58,15 +59,19 @@ public abstract class EntityParser<T extends Entity> {
* @throws FalconException
*/
public Entity parseAndValidate(String xmlString) throws FalconException {
- InputStream inputStream = new ByteArrayInputStream(xmlString.getBytes());
- Entity entity = parseAndValidate(inputStream);
- return entity;
+ InputStream inputStream = null;
+ try {
+ inputStream = new ByteArrayInputStream(xmlString.getBytes());
+ return parseAndValidate(inputStream);
+ } finally {
+ IOUtils.closeQuietly(inputStream);
+ }
}
/**
* Parses xml stream.
*
- * @param xmlStream
+ * @param xmlStream stream
* @return entity
* @throws FalconException
*/
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 5d70933..189fdd4 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -93,6 +93,7 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
referenceDate.set(date);
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SF_SWITCH_FALLTHROUGH"})
private static Date getRelative(Date date, int boundary, int month, int day, int hour, int minute) {
Calendar dsInstanceCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
dsInstanceCal.setTime(date);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index e40ab13..5dca46f 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -65,7 +65,7 @@ public class FeedGroup {
@Override
public boolean equals(Object obj) {
- if (!(obj instanceof FeedGroup) || obj == null) {
+ if (obj == null || !(obj instanceof FeedGroup)) {
return false;
}
FeedGroup group = (FeedGroup) obj;
@@ -94,10 +94,7 @@ public class FeedGroup {
}
public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
- if (this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()))) {
- return true;
- }
- return false;
+ return this.frequency.equals(feed.getFrequency())
+ && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
index 7689672..b80ab6d 100644
--- a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
+++ b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
@@ -44,7 +44,7 @@ public class FalconSecurityConfiguration extends Configuration {
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
if (parent == null || appName.equals(SecurityConstants.FALCON_LOGIN)) {
- return SIMPLE_CONF;
+ return SIMPLE_CONF.clone();
} else {
return parent.getAppConfigurationEntry(appName);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
index 778be11..ca646d5 100644
--- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
+++ b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
@@ -55,7 +55,7 @@ public class LogCleanupService implements FalconService {
}
- private class CleanupThread extends TimerTask {
+ private static class CleanupThread extends TimerTask {
private AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
private AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
index 5d670ef..3746729 100644
--- a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
@@ -79,16 +80,17 @@ public abstract class ApplicationProperties extends Properties {
}
}
- protected void loadProperties() throws FalconException {
+ void loadProperties() throws FalconException {
InputStream resource;
try {
if (location == LocationType.CLASSPATH) {
- if (getClass().getResource(propertyFile) != null) {
- LOG.info("Property file being loaded from " + getClass().getResource(propertyFile));
- resource = getClass().getResourceAsStream(propertyFile);
+ Class clazz = ApplicationProperties.class;
+ if (clazz.getResource(propertyFile) != null) {
+ LOG.info("Property file being loaded from " + clazz.getResource(propertyFile));
+ resource = clazz.getResourceAsStream(propertyFile);
} else {
- LOG.info("Property file being loaded from " + getClass().getResource("/" + propertyFile));
- resource = getClass().getResourceAsStream("/" + propertyFile);
+ LOG.info("Property file being loaded from " + clazz.getResource("/" + propertyFile));
+ resource = clazz.getResourceAsStream("/" + propertyFile);
}
} else {
resource = new FileInputStream(propertyFile);
@@ -120,7 +122,7 @@ public abstract class ApplicationProperties extends Properties {
resource.close();
}
}
- } catch (Exception e) {
+ } catch (IOException e) {
throw new FalconException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
index 8909c17..eca2912 100644
--- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
@@ -37,7 +37,7 @@ public final class DeploymentUtil {
protected static final String CURRENT_COLO;
protected static final boolean EMBEDDED_MODE;
- protected static boolean prism = false;
+ private static boolean prism = false;
static {
DEFAULT_ALL_COLOS.add(DEFAULT_COLO);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
index cc87c8c..d4a2232 100644
--- a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
@@ -61,7 +61,7 @@ public final class RuntimeProperties extends ApplicationProperties {
/**
* Thread for loading properties periodically.
*/
- private final class DynamicLoader implements Runnable {
+ private static final class DynamicLoader implements Runnable {
private static final long REFRESH_DELAY = 300000L;
private static final int MAX_ITER = 20; //1hr
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index a409e44..d954202 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -148,7 +148,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
retentionWorkflow.setConfiguration(getCoordConfig(props));
retentionAction.setWorkflow(retentionWorkflow);
return retentionAction;
- } catch (Exception e) {
+ } catch (IOException e) {
throw new FalconException("Unable to create parent/retention workflow", e);
}
}
@@ -328,7 +328,5 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
}
return null;
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index f59719d..d9f0f52 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -57,6 +57,11 @@
</dependency>
<dependency>
+ <groupId>net.sourceforge.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index a2ced24..ddd6781 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -41,8 +41,7 @@ import java.util.Map;
public class EntityInstanceMessage {
private final Map<ARG, String> keyValueMap = new LinkedHashMap<ARG, String>();
- private static final Logger LOG = Logger
- .getLogger(EntityInstanceMessage.class);
+ private static final Logger LOG = Logger.getLogger(EntityInstanceMessage.class);
private static final String FALCON_ENTITY_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
/**
@@ -211,7 +210,6 @@ public class EntityInstanceMessage {
LOG.debug("Returning instance paths for feed " + instancePaths[1]);
return instancePaths[1].split(",");
}
-
}
public String getFalconDate(String nominalTime) throws ParseException {
@@ -223,5 +221,4 @@ public class EntityInstanceMessage {
return falconFormat.format(nominalDate);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index f3ddb99..359365f 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.Tag;
+import org.apache.commons.io.IOUtils;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.ExternalId;
@@ -45,6 +46,7 @@ import org.apache.oozie.client.OozieClient;
import javax.xml.bind.*;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.util.HashMap;
@@ -74,10 +76,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
@Override
public boolean accept(Path path) {
- if (path.getName().startsWith("falcon")) {
- return true;
- }
- return false;
+ return path.getName().startsWith("falcon");
}
@Override
@@ -229,14 +228,6 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
return prop;
}
- protected org.apache.falcon.oozie.bundle.CONFIGURATION.Property createBundleProperty(String name, String value) {
- org.apache.falcon.oozie.bundle.CONFIGURATION.Property prop
- = new org.apache.falcon.oozie.bundle.CONFIGURATION.Property();
- prop.setName(name);
- prop.setValue(value);
- return prop;
- }
-
protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
throws FalconException {
@@ -315,26 +306,34 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
}
protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
+ InputStream resourceAsStream = null;
try {
+ resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
Unmarshaller unmarshaller = WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
@SuppressWarnings("unchecked")
- JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(this.getClass()
- .getResourceAsStream(template));
+ JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+ resourceAsStream);
return jaxbElement.getValue();
} catch (JAXBException e) {
throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
}
}
protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
+ InputStream resourceAsStream = null;
try {
+ resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
Unmarshaller unmarshaller = COORD_JAXB_CONTEXT.createUnmarshaller();
@SuppressWarnings("unchecked")
- JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>) unmarshaller
- .unmarshal(AbstractOozieEntityMapper.class.getResourceAsStream(template));
+ JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
+ unmarshaller.unmarshal(resourceAsStream);
return jaxbElement.getValue();
} catch (JAXBException e) {
throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index a1b0c32..4021464 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -63,7 +63,6 @@ public class LogMover extends Configured implements Tool {
private String subflowId;
private String runId;
private String logDir;
- private String status;
private String entityType;
}
@@ -92,7 +91,7 @@ public class LogMover extends Configured implements Tool {
if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())) {
// if replication wf
copyOozieLog(client, fs, path, jobInfo.getId());
- copyTTlogs(args, fs, path, jobInfo.getActions().get(2));
+ copyTTlogs(fs, path, jobInfo.getActions().get(2));
} else {
// if process wf
String subflowId = jobInfo.getExternalId();
@@ -102,7 +101,7 @@ public class LogMover extends Configured implements Tool {
for (WorkflowAction action : actions) {
if (action.getType().equals("pig")
|| action.getType().equals("java")) {
- copyTTlogs(args, fs, path, action);
+ copyTTlogs(fs, path, action);
} else {
LOG.info("Ignoring hadoop TT log for non-pig and non-java action:"
+ action.getName());
@@ -125,7 +124,7 @@ public class LogMover extends Configured implements Tool {
LOG.info("Copied oozie log to " + path);
}
- private void copyTTlogs(ARGS args, FileSystem fs, Path path,
+ private void copyTTlogs(FileSystem fs, Path path,
WorkflowAction action) throws Exception {
String ttLogURL = getTTlogURL(action.getExternalId());
if (ttLogURL != null) {
@@ -178,9 +177,7 @@ public class LogMover extends Configured implements Tool {
args.subflowId = cmd.getOptionValue("subflowId");
args.runId = cmd.getOptionValue("runId");
args.logDir = cmd.getOptionValue("logDir");
- args.status = cmd.getOptionValue("status");
args.entityType = cmd.getOptionValue("entityType");
-
}
private String getTTlogURL(String jobId) throws Exception {
@@ -208,5 +205,4 @@ public class LogMover extends Configured implements Tool {
connection.connect();
return connection.getInputStream();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index 175b832..b757531 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -52,10 +52,8 @@ public final class OozieClientFactory {
OozieClient ref = getClientRef(oozieUrl);
LOG.info("Caching Oozie client object for " + oozieUrl);
CACHE.putIfAbsent(oozieUrl, ref);
- return ref;
- } else {
- return CACHE.get(oozieUrl);
}
+ return CACHE.get(oozieUrl);
}
public static OozieClient get(String cluster) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index a75ad74..fcb9d80 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -86,8 +86,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
public void schedule(Entity entity) throws FalconException {
Map<String, BundleJob> bundleMap = findLatestBundle(entity);
List<String> schedClusters = new ArrayList<String>();
- for (String cluster : bundleMap.keySet()) {
- BundleJob bundleJob = bundleMap.get(cluster);
+ for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
+ String cluster = entry.getKey();
+ BundleJob bundleJob = entry.getValue();
if (bundleJob == MISSING || bundleJob.getStatus().equals(Job.Status.KILLED)) {
if (bundleJob != MISSING) {
LOG.warn("Bundle id: " + bundleJob.getId() + " is in killed state, so allowing schedule");
@@ -103,10 +104,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
ENGINE, entity);
Map<String, Properties> newFlows = builder.newWorkflowSchedule(
entity, schedClusters);
- for (String cluster : newFlows.keySet()) {
+ for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
+ String cluster = entry.getKey();
LOG.info("Scheduling " + entity.toShortString()
+ " on cluster " + cluster);
- scheduleEntity(cluster, newFlows.get(cluster), entity);
+ scheduleEntity(cluster, entry.getValue(), entity);
}
}
}
@@ -216,10 +218,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
Map<String, BundleJob> bundleMap = new HashMap<String, BundleJob>();
- for (String cluster : bundlesMap.keySet()) {
+ for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) {
+ String cluster = entry.getKey();
Date latest = null;
bundleMap.put(cluster, MISSING);
- for (BundleJob job : bundlesMap.get(cluster)) {
+ for (BundleJob job : entry.getValue()) {
if (latest == null || latest.before(job.getCreatedTime())) {
bundleMap.put(cluster, job);
latest = job.getCreatedTime();
@@ -481,60 +484,50 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private InstancesResult doJobAction(JobAction action, Entity entity,
Date start, Date end, Properties props) throws FalconException {
-
- Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(
- entity, start, end);
- List<String> clusterList = getIncludedClusters(props,
- FALCON_INSTANCE_ACTION_CLUSTERS);
- List<String> sourceClusterList = getIncludedClusters(props,
- FALCON_INSTANCE_SOURCE_CLUSTERS);
+ Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end);
+ List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
+ List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
APIResult.Status overallStatus = APIResult.Status.SUCCEEDED;
int instanceCount = 0;
List<Instance> instances = new ArrayList<Instance>();
- for (String cluster : actionsMap.keySet()) {
+ for (Map.Entry<String, List<CoordinatorAction>> entry : actionsMap.entrySet()) {
+ String cluster = entry.getKey();
if (clusterList.size() != 0 && !clusterList.contains(cluster)) {
continue;
}
- List<CoordinatorAction> actions = actionsMap.get(cluster);
+ List<CoordinatorAction> actions = entry.getValue();
String sourceCluster = null;
for (CoordinatorAction coordinatorAction : actions) {
if (entity.getEntityType() == EntityType.FEED) {
- sourceCluster = getSourceCluster(cluster,
- coordinatorAction, entity);
- if (sourceClusterList.size() != 0
- && !sourceClusterList.contains(sourceCluster)) {
+ sourceCluster = getSourceCluster(cluster, coordinatorAction, entity);
+ if (sourceClusterList.size() != 0 && !sourceClusterList.contains(sourceCluster)) {
continue;
}
}
String status = mapActionStatus(coordinatorAction.getStatus());
WorkflowJob jobInfo = null;
if (coordinatorAction.getExternalId() != null) {
- jobInfo = getWorkflowInfo(cluster,
- coordinatorAction.getExternalId());
+ jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
}
instanceCount++;
if (jobInfo != null) {
status = mapWorkflowStatus(jobInfo.getStatus());
try {
- status = performAction(action, props, cluster, status,
- jobInfo);
+ status = performAction(action, props, cluster, status, jobInfo);
} catch (FalconException e) {
- LOG.warn("Unable to perform action " + action
- + " on cluster ", e);
+ LOG.warn("Unable to perform action " + action + " on cluster ", e);
status = WorkflowStatus.ERROR.name();
overallStatus = APIResult.Status.PARTIAL;
}
}
if (action != OozieWorkflowEngine.JobAction.STATUS
&& coordinatorAction.getExternalId() != null) {
- jobInfo = getWorkflowInfo(cluster,
- coordinatorAction.getExternalId());
+ jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
}
- String nominalTimeStr = SchemaHelper
- .formatDateUTC(coordinatorAction.getNominalTime());
+ String nominalTimeStr = SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime());
InstancesResult.Instance instance = new InstancesResult.Instance(
cluster, nominalTimeStr, WorkflowStatus.valueOf(status));
if (jobInfo != null) {
@@ -543,18 +536,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
instance.logFile = jobInfo.getConsoleUrl();
instance.sourceCluster = sourceCluster;
}
- instance.details = coordinatorAction
- .getMissingDependencies();
+ instance.details = coordinatorAction.getMissingDependencies();
instances.add(instance);
}
}
if (instanceCount < 2 && overallStatus == APIResult.Status.PARTIAL) {
overallStatus = APIResult.Status.FAILED;
}
- InstancesResult instancesResult = new InstancesResult(overallStatus,
- action.name());
- instancesResult.setInstances(instances.toArray(new Instance[instances
- .size()]));
+ InstancesResult instancesResult = new InstancesResult(overallStatus, action.name());
+ instancesResult.setInstances(instances.toArray(new Instance[instances.size()]));
return instancesResult;
}
@@ -659,39 +649,33 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
Map<String, List<CoordinatorAction>> actionsMap = new HashMap<String, List<CoordinatorAction>>();
- for (String cluster : bundlesMap.keySet()) {
- List<BundleJob> bundles = bundlesMap.get(cluster);
+ for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) {
+ String cluster = entry.getKey();
+ List<BundleJob> bundles = entry.getValue();
OozieClient client = OozieClientFactory.get(cluster);
- List<CoordinatorJob> applicableCoords = getApplicableCoords(entity,
- client, start, end, bundles);
+ List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client, start, end, bundles);
List<CoordinatorAction> actions = new ArrayList<CoordinatorAction>();
for (CoordinatorJob coord : applicableCoords) {
- Frequency freq = createFrequency(coord.getFrequency(),
- coord.getTimeUnit());
+ Frequency freq = createFrequency(coord.getFrequency(), coord.getTimeUnit());
TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
- Date iterStart = EntityUtil.getNextStartTime(
- coord.getStartTime(), freq, tz, start);
+ Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start);
Date iterEnd = (coord.getNextMaterializedTime().before(end) ? coord.getNextMaterializedTime() : end);
while (!iterStart.after(iterEnd)) {
- int sequence = EntityUtil.getInstanceSequence(
- coord.getStartTime(), freq, tz, iterStart);
+ int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterStart);
String actionId = coord.getId() + "@" + sequence;
CoordinatorAction coordActionInfo = null;
try {
coordActionInfo = client.getCoordActionInfo(actionId);
} catch (OozieClientException e) {
- LOG.debug("Unable to get action for " + actionId + " "
- + e.getMessage());
+ LOG.debug("Unable to get action for " + actionId + " " + e.getMessage());
}
if (coordActionInfo != null) {
actions.add(coordActionInfo);
}
- Calendar startCal = Calendar.getInstance(EntityUtil
- .getTimeZone(coord.getTimeZone()));
+ Calendar startCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
startCal.setTime(iterStart);
- startCal.add(freq.getTimeUnit().getCalendarUnit(),
- coord.getFrequency());
+ startCal.add(freq.getTimeUnit().getCalendarUnit(), coord.getFrequency());
iterStart = startCal.getTime();
}
}
@@ -851,7 +835,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private Date offsetTime(Date date, int minute) {
- return new Date(date.getTime() + minute * 60 * 1000);
+ return new Date(1000L * 60 * minute + date.getTime());
}
private Date getCoordLastActionTime(CoordinatorJob coord) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
index 7634984..2f430ba 100644
--- a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
@@ -74,12 +74,10 @@ public class CustomOozieClient extends OozieClient {
protected Properties call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
- Reader reader = new InputStreamReader(conn.getInputStream());
+ Reader reader = new InputStreamReader(conn.getInputStream(), "UTF_8");
JSONObject json = (JSONObject) JSONValue.parse(reader);
Properties props = new Properties();
- for (Object key : json.keySet()) {
- props.put(key, json.get(key));
- }
+ props.putAll(json);
return props;
} else {
handleError(conn);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 67fd237..829b2fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -271,6 +271,12 @@
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.2</version>
+ </dependency>
+
+ <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.9</version>
@@ -282,7 +288,6 @@
<version>1.9</version>
</dependency>
-
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
@@ -517,6 +522,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>net.sourceforge.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ <version>1.3.2</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -597,6 +607,11 @@
<artifactId>maven-site-plugin</artifactId>
<version>3.2</version>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
</plugins>
</pluginManagement>
@@ -716,9 +731,11 @@
</dependencies>
<executions>
<execution>
+ <id>checkstyle-check</id>
<goals>
<goal>check</goal>
</goals>
+ <phase>verify</phase>
<configuration>
<consoleOutput>true</consoleOutput>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
@@ -728,6 +745,25 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <!--debug>true</debug-->
+ <xmlOutput>true</xmlOutput>
+ <excludeFilterFile>${basedir}/../build-tools/src/main/resources/falcon/findbugs-exclude.xml</excludeFilterFile>
+ <failOnError>true</failOnError>
+ </configuration>
+ <executions>
+ <execution>
+ <id>findbugs-check</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>verify</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index cad0d68..d5605df 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -164,7 +164,7 @@ public abstract class AbstractEntityManager {
* Post an entity XML with entity type. Validates the XML which can be
* Process, Feed or Dataendpoint
*
- * @param type
+ * @param type entity type
* @return APIResule -Succeeded or Failed
*/
public APIResult validate(HttpServletRequest request, String type) {
@@ -184,8 +184,8 @@ public abstract class AbstractEntityManager {
* Deletes a scheduled entity, a deleted entity is removed completely from
* execution pool.
*
- * @param type
- * @param entity
+ * @param type entity type
+ * @param entity entity name
* @return APIResult
*/
public APIResult delete(HttpServletRequest request, String type, String entity, String colo) {
@@ -287,7 +287,7 @@ public abstract class AbstractEntityManager {
private void canRemove(Entity entity) throws FalconException {
Pair<String, EntityType>[] referencedBy = EntityIntegrityChecker.referencedBy(entity);
if (referencedBy != null && referencedBy.length > 0) {
- StringBuffer messages = new StringBuffer();
+ StringBuilder messages = new StringBuilder();
for (Pair<String, EntityType> ref : referencedBy) {
messages.append(ref).append("\n");
}
@@ -371,14 +371,14 @@ public abstract class AbstractEntityManager {
/**
* Returns the status of requested entity.
*
- * @param type
- * @param entity
+ * @param type entity type
+ * @param entity entity name
* @return String
*/
public APIResult getStatus(String type, String entity, String colo) {
checkColo(colo);
- Entity entityObj = null;
+ Entity entityObj;
try {
entityObj = EntityUtil.getEntity(type, entity);
EntityType entityType = EntityType.valueOf(type.toUpperCase());
@@ -410,19 +410,19 @@ public abstract class AbstractEntityManager {
/**
* Returns dependencies.
*
- * @param type
- * @param entity
+ * @param type entity type
+ * @param entityName entity name
* @return EntityList
*/
- public EntityList getDependencies(String type, String entity) {
+ public EntityList getDependencies(String type, String entityName) {
try {
- Entity entityObj = EntityUtil.getEntity(type, entity);
+ Entity entityObj = EntityUtil.getEntity(type, entityName);
Set<Entity> dependents = EntityGraph.get().getDependents(entityObj);
Entity[] entities = dependents.toArray(new Entity[dependents.size()]);
- return new EntityList(entities == null ? new Entity[]{} : entities);
+ return new EntityList(entities);
} catch (Exception e) {
- LOG.error("Unable to get dependencies for entity " + entity + "(" + type + ")", e);
+ LOG.error("Unable to get dependencies for entityName " + entityName + "(" + type + ")", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
@@ -430,14 +430,14 @@ public abstract class AbstractEntityManager {
/**
* Returns the list of entities registered of a given type.
*
- * @param type
+ * @param type entity type
* @return String
*/
public EntityList getDependencies(String type) {
try {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Collection<String> entityNames = configStore.getEntities(entityType);
- if (entityNames == null || entityNames.equals("")) {
+ if (entityNames == null || entityNames.isEmpty()) {
return new EntityList(new Entity[]{});
}
Entity[] entities = new Entity[entityNames.size()];
@@ -455,8 +455,8 @@ public abstract class AbstractEntityManager {
/**
* Returns the entity definition as an XML based on name.
*
- * @param type
- * @param entityName
+ * @param type entity type
+ * @param entityName entity name
* @return String
*/
public String getEntityDefinition(String type, String entityName) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
index de14547..a2b9f61 100644
--- a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
@@ -111,6 +111,7 @@ public class AdminResource {
//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
@XmlRootElement(name = "property")
@XmlAccessorType(XmlAccessType.FIELD)
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"})
private static class Property {
public String key;
public String value;
@@ -120,6 +121,7 @@ public class AdminResource {
//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
@XmlRootElement(name = "properties")
@XmlAccessorType(XmlAccessType.FIELD)
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"})
private static class PropertyList {
public List<Property> properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
index d9ed6af..d01dff0 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
@@ -40,7 +40,7 @@ public abstract class AbstractChannel implements Channel {
item.getParameterTypes());
if (methodKey.equals(itemKey)) {
methods.putIfAbsent(methodKey, item);
- return item;
+ return methods.get(methodKey);
}
}
throw new FalconException("Lookup for " + methodKey
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java b/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
index 446bcc6..97aaff0 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
@@ -40,7 +40,7 @@ public class MethodKey {
public MethodKey(String name, Class[] args) {
this.name = name;
- argClasses = args;
+ argClasses = args.clone();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index dfc02a1..643f98b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -220,12 +220,8 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
Map<String, InstancesResult> results = new HashMap<String, InstancesResult>();
for (String colo : colos) {
try {
- APIResult resultHolder = doExecute(colo);
- if (resultHolder instanceof InstancesResult) {
- results.put(colo, (InstancesResult) resultHolder);
- } else {
- throw new FalconException(resultHolder.getMessage());
- }
+ InstancesResult resultHolder = doExecute(colo);
+ results.put(colo, resultHolder);
} catch (FalconException e) {
results.put(colo, new InstancesResult(APIResult.Status.FAILED,
e.getClass().getName() + "::" + e.getMessage(),
@@ -252,7 +248,8 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
StringBuilder requestIds = new StringBuilder();
List<Instance> instances = new ArrayList<Instance>();
int statusCount = 0;
- for (String colo : results.keySet()) {
+ for (Map.Entry<String, InstancesResult> entry : results.entrySet()) {
+ String colo = entry.getKey();
InstancesResult result = results.get(colo);
message.append(colo).append('/').append(result.getMessage()).append('\n');
requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index e20f091..f11998a 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -84,21 +84,16 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
try {
debug(mapMessage);
String cluster = mapMessage.getString(ARG.cluster.getArgName());
- String entityName = mapMessage.getString(ARG.entityName
- .getArgName());
- String entityType = mapMessage.getString(ARG.entityType
- .getArgName());
- String workflowId = mapMessage.getString(ARG.workflowId
- .getArgName());
+ String entityName = mapMessage.getString(ARG.entityName.getArgName());
+ String entityType = mapMessage.getString(ARG.entityType.getArgName());
+ String workflowId = mapMessage.getString(ARG.workflowId.getArgName());
String runId = mapMessage.getString(ARG.runId.getArgName());
- String nominalTime = mapMessage.getString(ARG.nominalTime
- .getArgName());
+ String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName());
String status = mapMessage.getString(ARG.status.getArgName());
String operation = mapMessage.getString(ARG.operation.getArgName());
AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- InstancesResult result = wfEngine
- .getJobDetails(cluster, workflowId);
+ InstancesResult result = wfEngine.getJobDetails(cluster, workflowId);
Date startTime = result.getInstances()[0].startTime;
Date endTime = result.getInstances()[0].endTime;
Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
@@ -121,12 +116,13 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
notifySLAService(cluster, entityName, entityType, nominalTime, duration);
}
- } catch (Exception ignore) {
- LOG.info(
- "Error in onMessage for subscriber of topic: "
- + this.toString(), ignore);
+ } catch (JMSException e) {
+ LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
+ } catch (FalconException e) {
+ LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
+ } catch (Exception e) {
+ LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
}
-
}
private void notifySLAService(String cluster, String entityName,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index efb5691..5c51b5f 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -106,10 +106,11 @@ public class FeedReplicator extends Configured implements Tool {
part = index;
break;
}
- String result = "";
+ StringBuilder resultBuffer = new StringBuilder();
for (int index = 0; index <= part; index++) {
- result += (patterns[index] + "/");
+ resultBuffer.append(patterns[index]).append("/");
}
+ String result = resultBuffer.toString();
return result.substring(0, result.lastIndexOf('/'));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 95a3511..5b758b8 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -95,8 +95,7 @@ public class LateDataHandler extends Configured implements Tool {
OutputStream out = file.getFileSystem(getConf()).create(file);
for (Map.Entry<String, Long> entry : map.entrySet()) {
- out.write((entry.getKey() + "=" + entry.getValue() + "\n")
- .getBytes());
+ out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
}
out.close();
return 0;
@@ -114,8 +113,8 @@ public class LateDataHandler extends Configured implements Tool {
throws Exception {
StringBuilder buffer = new StringBuilder();
- BufferedReader in = new BufferedReader(new InputStreamReader(file
- .getFileSystem(conf).open(file)));
+ BufferedReader in = new BufferedReader(new InputStreamReader(
+ file.getFileSystem(conf).open(file)));
String line;
try {
Map<String, Long> recorded = new LinkedHashMap<String, Long>();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 9ae6458..0dcc93d 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
/**
* Event representing a rerun.
*/
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EQ_COMPARETO_USE_OBJECT_EQUALS"})
public class RerunEvent implements Delayed {
protected static final String SEP = "*";
@@ -86,6 +87,7 @@ public class RerunEvent implements Delayed {
return entityType;
}
+
@Override
public int compareTo(Delayed o) {
RerunEvent event = (RerunEvent) o;
@@ -116,5 +118,4 @@ public class RerunEvent implements Delayed {
return null;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 40c5d1c..1e4bd25 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -118,7 +118,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
PolicyType latePolicy = lateProcess.getPolicy();
Date cutOffTime = getCutOffTime(entity, nominalTime);
Date now = new Date();
- Long wait = null;
+ Long wait;
if (now.after(cutOffTime)) {
LOG.warn("Feed Cut Off time: "
@@ -144,7 +144,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
ExpressionHelper evaluator = ExpressionHelper.get();
Date instanceStart = EntityUtil.parseDateUTC(nominalTime);
ExpressionHelper.setReferenceDate(instanceStart);
- Date endTime = new Date();
+ Date endTime;
Date feedCutOff = new Date(0);
if (entity.getEntityType() == EntityType.FEED) {
if (((Feed) entity).getLateArrival() == null) {
@@ -170,6 +170,9 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
break;
}
}
+ if (feed == null) {
+ throw new IllegalStateException("No such feed: " + lp.getInput());
+ }
if (feed.getLateArrival() == null) {
LOG.debug("Feed's " + feed.getName()
+ " late arrival cut-off is not configured, ignoring this feed");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 5bd8fd8..2b41a7c 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,9 +38,8 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
AbstractRerunHandler<RetryEvent, M> {
@Override
- public void handleRerun(String cluster, String entityType,
- String entityName, String nominalTime, String runId, String wfId,
- long msgReceivedTime) {
+ public void handleRerun(String cluster, String entityType, String entityName,
+ String nominalTime, String runId, String wfId, long msgReceivedTime) {
try {
Entity entity = getEntity(entityType, entityName);
Retry retry = getRetry(entity);
@@ -57,10 +56,8 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
int intRunId = Integer.parseInt(runId);
if (attempts > intRunId) {
- AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
- .getRetryPolicy(policy);
- long delayTime = rerunPolicy.getDelay(delay,
- Integer.parseInt(runId));
+ AbstractRerunPolicy rerunPolicy = RerunPolicyFactory.getRetryPolicy(policy);
+ long delayTime = rerunPolicy.getDelay(delay, Integer.parseInt(runId));
RetryEvent event = new RetryEvent(cluster, wfId,
msgReceivedTime, delayTime, entityType, entityName,
nominalTime, intRunId, attempts, 0);
@@ -76,13 +73,10 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
"All retry attempt failed out of configured: "
+ attempts + " attempt for entity instance::");
}
- } catch (Exception e) {
- LOG.error("Error during retry of entity instance " + entityName
- + ":" + nominalTime, e);
- GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,
- wfId, runId, e.getMessage());
+ } catch (FalconException e) {
+ LOG.error("Error during retry of entity instance " + entityName + ":" + nominalTime, e);
+ GenericAlert.alertRetryFailed(entityType, entityName, nominalTime, wfId, runId, e.getMessage());
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index 06feb92..8234d8a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -18,6 +18,7 @@
package org.apache.falcon.rerun.queue;
import org.apache.falcon.FalconException;
+import org.apache.commons.io.IOUtils;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.event.RerunEventFactory;
@@ -36,7 +37,11 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
public static final Logger LOG = Logger.getLogger(DelayedQueue.class);
protected DelayQueue<T> delayQueue = new DelayQueue<T>();
- private File serializeFilePath;
+ private final File serializeFilePath;
+
+ public InMemoryQueue(File serializeFilePath) {
+ this.serializeFilePath = serializeFilePath;
+ }
@Override
public boolean offer(T event) {
@@ -59,10 +64,6 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
return event;
}
- public InMemoryQueue(File serializeFilePath) {
- this.serializeFilePath = serializeFilePath;
- }
-
public void populateQueue(List<T> events) {
for (T event : events) {
delayQueue.offer(event);
@@ -82,17 +83,18 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
private void beforeRetry(T event) {
File retryFile = getRetryFile(serializeFilePath, event);
+ BufferedWriter out = null;
try {
- BufferedWriter out = new BufferedWriter(new FileWriter(retryFile,
- true));
+ out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(retryFile, true)));
out.write(event.toString());
out.newLine();
out.close();
} catch (IOException e) {
- LOG.warn(
- "Unable to write entry for process-instance: "
+ LOG.warn("Unable to write entry for process-instance: "
+ event.getEntityName() + ":"
+ event.getInstance(), e);
+ } finally {
+ IOUtils.closeQuietly(out);
}
}
@@ -121,24 +123,25 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
private List<T> bootstrap() {
List<T> rerunEvents = new ArrayList<T>();
- for (File rerunFile : this.serializeFilePath.listFiles()) {
- try {
- BufferedReader reader = new BufferedReader(new FileReader(
- rerunFile));
- String line;
- while ((line = reader.readLine()) != null) {
- line.split("");
- T event = new RerunEventFactory<T>().getRerunEvent(
- rerunFile.getName(), line);
- rerunEvents.add(event);
+ File[] files = serializeFilePath.listFiles();
+ if (files != null) {
+ for (File rerunFile : files) {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(rerunFile), "UTF_8"));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ T event = new RerunEventFactory<T>().getRerunEvent(
+ rerunFile.getName(), line);
+ rerunEvents.add(event);
+ }
+ } catch (Exception e) {
+ LOG.warn("Not able to read rerun entry " + rerunFile.getAbsolutePath(), e);
+ } finally {
+ IOUtils.closeQuietly(reader);
}
- } catch (Exception e) {
- LOG.warn(
- "Not able to read rerun entry "
- + rerunFile.getAbsolutePath(), e);
}
}
-
return rerunEvents;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bfad459e/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 155c847..6df7488 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -133,7 +133,6 @@ public class FeedEvictor extends Configured implements Tool {
instancePaths.append(path).append(",");
}
}
-
}
private void logInstancePaths(Path path) throws IOException {