You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:50:25 UTC
[09/47] Fixes for Checkstyle
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/process/src/test/resources/config/late/late-process2.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/late/late-process2.xml b/process/src/test/resources/config/late/late-process2.xml
index a9d3576..bc507ad 100644
--- a/process/src/test/resources/config/late/late-process2.xml
+++ b/process/src/test/resources/config/late/late-process2.xml
@@ -16,8 +16,8 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-<process name="late-process2" xmlns="uri:falcon:process:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <!-- where -->
+<process name="late-process2" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
<clusters>
<cluster name="late-cluster">
<validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
@@ -32,25 +32,26 @@
<!-- what -->
<inputs>
- <input name="impression" feed="late-feed1" start-instance="today(0,0)" end-instance="today(0,2)" />
- <input name="clicks" feed="late-feed2" start-instance="yesterday(0,0)" end-instance="today(0,0)" partition="*/US"/>
+ <input name="impression" feed="late-feed1" start-instance="today(0,0)" end-instance="today(0,2)"/>
+ <input name="clicks" feed="late-feed2" start-instance="yesterday(0,0)" end-instance="today(0,0)"
+ partition="*/US"/>
</inputs>
<outputs>
- <output name="clicksummary" feed="late-feed3" instance="today(0,0)" />
+ <output name="clicksummary" feed="late-feed3" instance="today(0,0)"/>
</outputs>
<!-- how -->
<properties>
- <property name="procprop" value="procprop"/>
+ <property name="procprop" value="procprop"/>
</properties>
-
- <workflow engine="oozie" path="/user/guest/workflow" />
- <retry policy="periodic" delay="hours(10)" attempts="3" />
+ <workflow engine="oozie" path="/user/guest/workflow"/>
+
+ <retry policy="periodic" delay="hours(10)" attempts="3"/>
<late-process policy="exp-backoff" delay="hours(1)">
- <late-input feed="impression" workflow-path="hdfs://impression/late/workflow" />
- <late-input feed="clicks" workflow-path="hdfs://clicks/late/workflow" />
+ <late-input feed="impression" workflow-path="hdfs://impression/late/workflow"/>
+ <late-input feed="clicks" workflow-path="hdfs://clicks/late/workflow"/>
</late-process>
</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index b5a7f50..91d5e0f 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -4,11 +4,11 @@
License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed
under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing
permissions and ~ limitations under the License. -->
-<process name="sample" xmlns="uri:falcon:process:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<process name="sample" xmlns="uri:falcon:process:0.1">
<!-- where -->
<clusters>
<cluster name="corp">
- <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z" />
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
</cluster>
</clusters>
@@ -20,26 +20,26 @@
<!-- what -->
<inputs>
- <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)" />
- <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US" />
+ <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)"/>
+ <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
</inputs>
<outputs>
- <output name="clicksummary" feed="impressions" instance="today(0,0)" />
+ <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
</outputs>
<!-- how -->
<properties>
- <property name="procprop" value="procprop" />
- <property name="mapred.job.priority" value="LOW" />
+ <property name="procprop" value="procprop"/>
+ <property name="mapred.job.priority" value="LOW"/>
</properties>
- <workflow engine="oozie" path="/user/guest/workflow" />
+ <workflow engine="oozie" path="/user/guest/workflow"/>
- <retry policy="periodic" delay="hours(10)" attempts="3" />
+ <retry policy="periodic" delay="hours(10)" attempts="3"/>
<late-process policy="exp-backoff" delay="hours(1)">
- <late-input input="impression" workflow-path="hdfs://impression/late/workflow" />
- <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow" />
+ <late-input input="impression" workflow-path="hdfs://impression/late/workflow"/>
+ <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
</late-process>
</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
index 969a59c..7dfd406 100644
--- a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
@@ -30,6 +30,7 @@ import java.io.IOException;
public class CustomReplicator extends DistCp {
private static Logger LOG = Logger.getLogger(CustomReplicator.class);
+
/**
* Public Constructor. Creates DistCp object with specified input-parameters.
* (E.g. source-paths, target-location, etc.)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index a693d75..fc0b5ac 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -17,16 +17,7 @@
*/
package org.apache.falcon.replication;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -39,31 +30,36 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
public class FeedReplicator extends Configured implements Tool {
- private static Logger LOG = Logger.getLogger(FeedReplicator.class);
+ private static Logger LOG = Logger.getLogger(FeedReplicator.class);
public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new FeedReplicator(), args);
- }
+ ToolRunner.run(new Configuration(), new FeedReplicator(), args);
+ }
- @Override
- public int run(String[] args) throws Exception {
+ @Override
+ public int run(String[] args) throws Exception {
DistCpOptions options = getDistCpOptions(args);
-
+
Configuration conf = this.getConf();
- // inject wf configs
- Path confPath = new Path("file:///"
- + System.getProperty("oozie.action.conf.xml"));
-
- LOG.info(confPath + " found conf ? "
- + confPath.getFileSystem(conf).exists(confPath));
- conf.addResource(confPath);
-
- DistCp distCp = new CustomReplicator(conf, options);
- LOG.info("Started DistCp");
- distCp.execute();
+ // inject wf configs
+ Path confPath = new Path("file:///"
+ + System.getProperty("oozie.action.conf.xml"));
+
+ LOG.info(confPath + " found conf ? "
+ + confPath.getFileSystem(conf).exists(confPath));
+ conf.addResource(confPath);
+
+ DistCp distCp = new CustomReplicator(conf, options);
+ LOG.info("Started DistCp");
+ distCp.execute();
Path targetPath = options.getTargetPath();
FileSystem fs = targetPath.getFileSystem(getConf());
@@ -79,28 +75,30 @@ public class FeedReplicator extends Configured implements Tool {
String fixedPath = getFixedPath(relativePath);
FileStatus[] files = fs.globStatus(new Path(targetPath.toString() + "/" + fixedPath));
- if (files != null) {
- for (FileStatus file : files) {
- fs.create(new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME)).close();
- LOG.info("Created " + new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME));
- }
- } else {
- LOG.info("No files present in path: "
- + new Path(targetPath.toString() + "/" + fixedPath)
- .toString());
- }
- LOG.info("Completed DistCp");
- return 0;
- }
+ if (files != null) {
+ for (FileStatus file : files) {
+ fs.create(new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME)).close();
+ LOG.info("Created " + new Path(file.getPath(), FileOutputCommitter.SUCCEEDED_FILE_NAME));
+ }
+ } else {
+ LOG.info("No files present in path: "
+ + new Path(targetPath.toString() + "/" + fixedPath)
+ .toString());
+ }
+ LOG.info("Completed DistCp");
+ return 0;
+ }
private String getFixedPath(String relativePath) throws IOException {
String[] patterns = relativePath.split("/");
int part = patterns.length - 1;
for (int index = patterns.length - 1; index >= 0; index--) {
String pattern = patterns[index];
- if (pattern.isEmpty()) continue;
+ if (pattern.isEmpty()) {
+ continue;
+ }
Pattern r = FilteredCopyListing.getRegEx(pattern);
- if (!r.toString().equals("(" + pattern + "/)|(" + pattern + "$)")) {
+ if (!r.toString().equals("(" + pattern + "/)|(" + pattern + "$)")) {
continue;
}
part = index;
@@ -114,42 +112,42 @@ public class FeedReplicator extends Configured implements Tool {
}
public DistCpOptions getDistCpOptions(String[] args) throws ParseException {
- Options options = new Options();
- Option opt;
- opt = new Option("maxMaps", true,
- "max number of maps to use for this copy");
- opt.setRequired(true);
- options.addOption(opt);
+ Options options = new Options();
+ Option opt;
+ opt = new Option("maxMaps", true,
+ "max number of maps to use for this copy");
+ opt.setRequired(true);
+ options.addOption(opt);
opt = new Option("sourcePaths", true,
- "comma separtated list of source paths to be copied");
- opt.setRequired(true);
- options.addOption(opt);
+ "comma separtated list of source paths to be copied");
+ opt.setRequired(true);
+ options.addOption(opt);
opt = new Option("targetPath", true, "target path");
- opt.setRequired(true);
- options.addOption(opt);
+ opt.setRequired(true);
+ options.addOption(opt);
- CommandLine cmd = new GnuParser().parse(options, args);
- String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
- List<Path> srcPaths = getPaths(paths);
- String trgPath = cmd.getOptionValue("targetPath").trim();
+ CommandLine cmd = new GnuParser().parse(options, args);
+ String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
+ List<Path> srcPaths = getPaths(paths);
+ String trgPath = cmd.getOptionValue("targetPath").trim();
- DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(
- trgPath));
+ DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(
+ trgPath));
distcpOptions.setSyncFolder(true);
- distcpOptions.setBlocking(true);
- distcpOptions
- .setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
-
- return distcpOptions;
- }
-
- private List<Path> getPaths(String[] paths) {
- List<Path> listPaths = new ArrayList<Path>();
- for (String path : paths) {
- listPaths.add(new Path(path));
- }
- return listPaths;
- }
+ distcpOptions.setBlocking(true);
+ distcpOptions
+ .setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
+
+ return distcpOptions;
+ }
+
+ private List<Path> getPaths(String[] paths) {
+ List<Path> listPaths = new ArrayList<Path>();
+ for (String path : paths) {
+ listPaths.add(new Path(path));
+ }
+ return listPaths;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
index e1b6276..c1698e3 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
@@ -32,12 +32,18 @@ import java.util.regex.Pattern;
public class FilteredCopyListing extends SimpleCopyListing {
private static final Logger LOG = Logger.getLogger(FilteredCopyListing.class);
- /** Default pattern character: Escape any special meaning. */
- private static final char PAT_ESCAPE = '\\';
- /** Default pattern character: Any single character. */
- private static final char PAT_ANY = '.';
- /** Default pattern character: Character set close. */
- private static final char PAT_SET_CLOSE = ']';
+ /**
+ * Default pattern character: Escape any special meaning.
+ */
+ private static final char PAT_ESCAPE = '\\';
+ /**
+ * Default pattern character: Any single character.
+ */
+ private static final char PAT_ANY = '.';
+ /**
+ * Default pattern character: Character set close.
+ */
+ private static final char PAT_SET_CLOSE = ']';
private Pattern regex;
@@ -55,7 +61,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
@Override
protected boolean shouldCopy(Path path, DistCpOptions options) {
- if (path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) return false;
+ if (path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+ return false;
+ }
return regex == null || regex.matcher(path.toString()).find();
}
@@ -74,8 +82,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
// Validate the pattern
len = filePattern.length();
- if (len == 0)
+ if (len == 0) {
return null;
+ }
setOpen = 0;
setRange = false;
@@ -89,8 +98,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
if (pCh == PAT_ESCAPE) {
fileRegex.append(pCh);
i++;
- if (i >= len)
+ if (i >= len) {
error("An escaped character does not present", filePattern, i);
+ }
pCh = filePattern.charAt(i);
} else if (isJavaRegexSpecialChar(pCh)) {
fileRegex.append(PAT_ESCAPE);
@@ -121,8 +131,9 @@ public class FilteredCopyListing extends SimpleCopyListing {
error("Incomplete character set range", filePattern, i);
} else if (pCh == PAT_SET_CLOSE && setOpen > 0) {
// End of a character set
- if (setOpen < 2)
+ if (setOpen < 2) {
error("Unexpected end of set", filePattern, i);
+ }
setOpen = 0;
} else if (setOpen > 0) {
// Normal character, or the end of a character set range
@@ -143,7 +154,7 @@ public class FilteredCopyListing extends SimpleCopyListing {
private static void error(String s, String pattern, int pos) throws IOException {
throw new IOException("Illegal file pattern: "
- +s+ " for glob "+ pattern + " at " + pos);
+ + s + " for glob " + pattern + " at " + pos);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
index f2ccfd8..1935e51 100644
--- a/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/repliation/FeedReplicatorTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.falcon.repliation;
+import org.apache.falcon.replication.FeedReplicator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.falcon.replication.FeedReplicator;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,9 +37,9 @@ public class FeedReplicatorTest {
* <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
*/
FeedReplicator replicator = new FeedReplicator();
- DistCpOptions options = replicator.getDistCpOptions(new String[] { "true", "-maxMaps", "5", "-sourcePaths",
- "hdfs://localhost:8020/tmp/", "-targetPath",
- "hdfs://localhost1:8020/tmp/" });
+ DistCpOptions options = replicator.getDistCpOptions(new String[]{"true", "-maxMaps", "5", "-sourcePaths",
+ "hdfs://localhost:8020/tmp/", "-targetPath",
+ "hdfs://localhost1:8020/tmp/"});
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
index dc17cc2..5054bf8 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
@@ -34,7 +34,9 @@ import org.testng.annotations.Test;
import java.io.DataOutputStream;
import java.net.URI;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
public class FilteredCopyListingTest {
@@ -69,8 +71,7 @@ public class FilteredCopyListingTest {
fileSystem = FileSystem.getLocal(new Configuration());
fileSystem.mkdirs(new Path(path));
recordInExpectedValues(path);
- }
- finally {
+ } finally {
IOUtils.cleanup(null, fileSystem);
}
}
@@ -80,8 +81,7 @@ public class FilteredCopyListingTest {
try {
fileSystem = FileSystem.getLocal(new Configuration());
fileSystem.delete(new Path(path), true);
- }
- finally {
+ } finally {
IOUtils.cleanup(null, fileSystem);
}
}
@@ -93,8 +93,7 @@ public class FilteredCopyListingTest {
fileSystem = FileSystem.getLocal(new Configuration());
outputStream = fileSystem.create(new Path(path), true, 10);
recordInExpectedValues(path);
- }
- finally {
+ } finally {
IOUtils.cleanup(null, fileSystem, outputStream);
}
}
@@ -209,7 +208,7 @@ public class FilteredCopyListingTest {
private void verifyContents(Path listingPath, int expected) throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.getLocal(new Configuration()),
listingPath, new Configuration());
- Text key = new Text();
+ Text key = new Text();
FileStatus value = new FileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 73dc882..00a2d87 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -18,19 +18,7 @@
package org.apache.falcon.latedata;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -40,131 +28,139 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
+import java.io.*;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
public class LateDataHandler extends Configured implements Tool {
- private static Logger LOG = Logger.getLogger(LateDataHandler.class);
-
- static PrintStream stream = System.out;
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Path confPath = new Path("file:///"
- + System.getProperty("oozie.action.conf.xml"));
-
- LOG.info(confPath + " found ? "
- + confPath.getFileSystem(conf).exists(confPath));
- conf.addResource(confPath);
- ToolRunner.run(conf, new LateDataHandler(), args);
- }
-
- private static CommandLine getCommand(String[] args) throws ParseException {
- Options options = new Options();
-
- Option opt = new Option("out", true, "Out file name");
- opt.setRequired(true);
- options.addOption(opt);
- opt = new Option("paths", true,
- "Comma separated path list, further separated by #");
- opt.setRequired(true);
- options.addOption(opt);
- opt = new Option("falconInputFeeds", true,
- "Input feed names, further separated by #");
- opt.setRequired(true);
- options.addOption(opt);
-
- return new GnuParser().parse(options, args);
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- CommandLine command = getCommand(args);
-
- Path file = new Path(command.getOptionValue("out"));
- Map<String, Long> map = new LinkedHashMap<String, Long>();
- String pathStr = getOptionValue(command, "paths");
- if(pathStr == null)
- return 0;
-
- String[] pathGroups = pathStr.split("#");
- String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
- "#");
- for (int index = 0; index < pathGroups.length; index++) {
- long usage = 0;
- for (String pathElement : pathGroups[index].split(",")) {
- Path inPath = new Path(pathElement);
- usage += usage(inPath, getConf());
- }
- map.put(inputFeeds[index], usage);
- }
- LOG.info("MAP data: " + map);
-
- OutputStream out = file.getFileSystem(getConf()).create(file);
- for (Map.Entry<String, Long> entry : map.entrySet()) {
- out.write((entry.getKey() + "=" + entry.getValue() + "\n")
- .getBytes());
- }
- out.close();
- return 0;
- }
-
- private String getOptionValue(CommandLine command, String option) {
- String value = command.getOptionValue(option);
- if(value.equals("null"))
- return null;
- return value;
- }
-
- public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
- throws Exception {
-
- StringBuffer buffer = new StringBuffer();
- BufferedReader in = new BufferedReader(new InputStreamReader(file
- .getFileSystem(conf).open(file)));
- String line;
- try {
- Map<String, Long> recorded = new LinkedHashMap<String, Long>();
- while ((line = in.readLine()) != null) {
- if (line.isEmpty())
- continue;
- int index = line.indexOf('=');
- String key = line.substring(0, index);
- long size = Long.parseLong(line.substring(index + 1));
- recorded.put(key, size);
- }
-
- for (Map.Entry<String, Long> entry : map.entrySet()) {
- if (recorded.get(entry.getKey()) == null) {
- LOG.info("No matching key " + entry.getKey());
- continue;
- }
- if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
- LOG.info("Recorded size:"+recorded.get(entry.getKey())+" is different from new size" + entry.getValue());
- buffer.append(entry.getKey()).append(',');
- }
- }
- if (buffer.length() == 0) {
- return "";
- } else {
- return buffer.substring(0, buffer.length() - 1);
- }
-
- } finally {
- in.close();
- }
-
- }
-
- public long usage(Path inPath, Configuration conf) throws IOException {
- FileSystem fs = inPath.getFileSystem(conf);
- FileStatus status[] = fs.globStatus(inPath);
- if (status == null || status.length == 0) {
- return 0;
- }
- long totalSize = 0;
- for (FileStatus statu : status) {
- totalSize += fs.getContentSummary(statu.getPath()).getLength();
- }
- return totalSize;
- }
+ private static Logger LOG = Logger.getLogger(LateDataHandler.class);
+
+ static PrintStream stream = System.out;
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ Path confPath = new Path("file:///"
+ + System.getProperty("oozie.action.conf.xml"));
+
+ LOG.info(confPath + " found ? "
+ + confPath.getFileSystem(conf).exists(confPath));
+ conf.addResource(confPath);
+ ToolRunner.run(conf, new LateDataHandler(), args);
+ }
+
+ private static CommandLine getCommand(String[] args) throws ParseException {
+ Options options = new Options();
+
+ Option opt = new Option("out", true, "Out file name");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option("paths", true,
+ "Comma separated path list, further separated by #");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option("falconInputFeeds", true,
+ "Input feed names, further separated by #");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return new GnuParser().parse(options, args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ CommandLine command = getCommand(args);
+
+ Path file = new Path(command.getOptionValue("out"));
+ Map<String, Long> map = new LinkedHashMap<String, Long>();
+ String pathStr = getOptionValue(command, "paths");
+ if (pathStr == null) {
+ return 0;
+ }
+
+ String[] pathGroups = pathStr.split("#");
+ String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
+ "#");
+ for (int index = 0; index < pathGroups.length; index++) {
+ long usage = 0;
+ for (String pathElement : pathGroups[index].split(",")) {
+ Path inPath = new Path(pathElement);
+ usage += usage(inPath, getConf());
+ }
+ map.put(inputFeeds[index], usage);
+ }
+ LOG.info("MAP data: " + map);
+
+ OutputStream out = file.getFileSystem(getConf()).create(file);
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ out.write((entry.getKey() + "=" + entry.getValue() + "\n")
+ .getBytes());
+ }
+ out.close();
+ return 0;
+ }
+
+ private String getOptionValue(CommandLine command, String option) {
+ String value = command.getOptionValue(option);
+ if (value.equals("null")) {
+ return null;
+ }
+ return value;
+ }
+
+ public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
+ throws Exception {
+
+ StringBuffer buffer = new StringBuffer();
+ BufferedReader in = new BufferedReader(new InputStreamReader(file
+ .getFileSystem(conf).open(file)));
+ String line;
+ try {
+ Map<String, Long> recorded = new LinkedHashMap<String, Long>();
+ while ((line = in.readLine()) != null) {
+ if (line.isEmpty()) {
+ continue;
+ }
+ int index = line.indexOf('=');
+ String key = line.substring(0, index);
+ long size = Long.parseLong(line.substring(index + 1));
+ recorded.put(key, size);
+ }
+
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ if (recorded.get(entry.getKey()) == null) {
+ LOG.info("No matching key " + entry.getKey());
+ continue;
+ }
+ if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
+ LOG.info("Recorded size:" + recorded.get(entry.getKey()) + " is different from new size"
+ + entry.getValue());
+ buffer.append(entry.getKey()).append(',');
+ }
+ }
+ if (buffer.length() == 0) {
+ return "";
+ } else {
+ return buffer.substring(0, buffer.length() - 1);
+ }
+
+ } finally {
+ in.close();
+ }
+
+ }
+
+ public long usage(Path inPath, Configuration conf) throws IOException {
+ FileSystem fs = inPath.getFileSystem(conf);
+ FileStatus status[] = fs.globStatus(inPath);
+ if (status == null || status.length == 0) {
+ return 0;
+ }
+ long totalSize = 0;
+ for (FileStatus statu : status) {
+ totalSize += fs.getContentSummary(statu.getPath()).getLength();
+ }
+ return totalSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 5f050ca..7a22704 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -19,20 +19,20 @@ package org.apache.falcon.rerun.event;
public class LaterunEvent extends RerunEvent {
- public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
- long delay, String entityType, String entityName, String instance,
- int runId) {
- super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
- instance, runId);
- }
+ public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
+ long delay, String entityType, String entityName, String instance,
+ int runId) {
+ super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+ instance, runId);
+ }
- @Override
- public String toString() {
- return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
- + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
- + delayInMilliSec + SEP + "entityType=" + entityType + SEP
- + "entityName=" + entityName + SEP + "instance=" + instance
- + SEP + "runId=" + runId;
- }
+ @Override
+ public String toString() {
+ return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
+ + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
+ + delayInMilliSec + SEP + "entityType=" + entityType + SEP
+ + "entityName=" + entityName + SEP + "instance=" + instance
+ + SEP + "runId=" + runId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 9526e0a..5a1e3e1 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -23,91 +23,91 @@ import java.util.concurrent.TimeUnit;
public class RerunEvent implements Delayed {
- protected static final String SEP = "*";
-
- public enum RerunType{
- RETRY, LATE
- }
-
- protected String clusterName;
- protected String wfId;
- protected long msgInsertTime;
- protected long delayInMilliSec;
- protected String entityType;
- protected String entityName;
- protected String instance;
- protected int runId;
-
- public RerunEvent(String clusterName, String wfId,
- long msgInsertTime, long delay, String entityType, String entityName,
- String instance, int runId) {
- this.clusterName = clusterName;
- this.wfId = wfId;
- this.msgInsertTime = msgInsertTime;
- this.delayInMilliSec = delay;
- this.entityName = entityName;
- this.instance = instance;
- this.runId = runId;
- this.entityType=entityType;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public String getWfId() {
- return wfId;
- }
-
- public long getDelayInMilliSec() {
- return delayInMilliSec;
- }
-
- public String getEntityName() {
- return entityName;
- }
-
- public String getInstance() {
- return instance;
- }
-
- public int getRunId() {
- return runId;
- }
-
- public String getEntityType(){
- return entityType;
- }
-
- @Override
- public int compareTo(Delayed o) {
+ protected static final String SEP = "*";
+
+ public enum RerunType {
+ RETRY, LATE
+ }
+
+ protected String clusterName;
+ protected String wfId;
+ protected long msgInsertTime;
+ protected long delayInMilliSec;
+ protected String entityType;
+ protected String entityName;
+ protected String instance;
+ protected int runId;
+
+ public RerunEvent(String clusterName, String wfId,
+ long msgInsertTime, long delay, String entityType, String entityName,
+ String instance, int runId) {
+ this.clusterName = clusterName;
+ this.wfId = wfId;
+ this.msgInsertTime = msgInsertTime;
+ this.delayInMilliSec = delay;
+ this.entityName = entityName;
+ this.instance = instance;
+ this.runId = runId;
+ this.entityType = entityType;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getWfId() {
+ return wfId;
+ }
+
+ public long getDelayInMilliSec() {
+ return delayInMilliSec;
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public int getRunId() {
+ return runId;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
RerunEvent event = (RerunEvent) o;
return new Date(msgInsertTime + delayInMilliSec).
compareTo(new Date(event.msgInsertTime + event.delayInMilliSec));
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert((msgInsertTime - System.currentTimeMillis())
- + delayInMilliSec, TimeUnit.MILLISECONDS);
- }
-
- public long getMsgInsertTime() {
- return msgInsertTime;
- }
-
- public void setMsgInsertTime(long msgInsertTime) {
- this.msgInsertTime = msgInsertTime;
- }
-
- public RerunType getType() {
- if (this instanceof RetryEvent) {
- return RerunType.RETRY;
- } else if (this instanceof LaterunEvent) {
- return RerunType.LATE;
- } else {
- return null;
- }
- }
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert((msgInsertTime - System.currentTimeMillis())
+ + delayInMilliSec, TimeUnit.MILLISECONDS);
+ }
+
+ public long getMsgInsertTime() {
+ return msgInsertTime;
+ }
+
+ public void setMsgInsertTime(long msgInsertTime) {
+ this.msgInsertTime = msgInsertTime;
+ }
+
+ public RerunType getType() {
+ if (this instanceof RetryEvent) {
+ return RerunType.RETRY;
+ } else if (this instanceof LaterunEvent) {
+ return RerunType.LATE;
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index fcdb836..c5e1e80 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -17,53 +17,54 @@
*/
package org.apache.falcon.rerun.event;
+import org.apache.falcon.rerun.event.RerunEvent.RerunType;
+
import java.util.HashMap;
import java.util.Map;
-import org.apache.falcon.rerun.event.RerunEvent.RerunType;
-
public class RerunEventFactory<T extends RerunEvent> {
- public T getRerunEvent(String type, String line) {
- if (type.startsWith(RerunType.RETRY.name())) {
- return retryEventFromString(line);
- } else if (type.startsWith(RerunType.LATE.name())) {
- return lateEventFromString(line);
- } else
- return null;
- }
+ public T getRerunEvent(String type, String line) {
+ if (type.startsWith(RerunType.RETRY.name())) {
+ return retryEventFromString(line);
+ } else if (type.startsWith(RerunType.LATE.name())) {
+ return lateEventFromString(line);
+ } else {
+ return null;
+ }
+ }
- @SuppressWarnings("unchecked")
- private T lateEventFromString(String line) {
- Map<String, String> map = getMap(line);
- return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
- Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
- .get("delayInMilliSec")), map.get("entityType"),
- map.get("entityName"), map.get("instance"),
- Integer.parseInt(map.get("runId")));
- }
+ @SuppressWarnings("unchecked")
+ private T lateEventFromString(String line) {
+ Map<String, String> map = getMap(line);
+ return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
+ Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
+ .get("delayInMilliSec")), map.get("entityType"),
+ map.get("entityName"), map.get("instance"),
+ Integer.parseInt(map.get("runId")));
+ }
- @SuppressWarnings("unchecked")
- public T retryEventFromString(String line) {
- Map<String, String> map = getMap(line);
- return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
- Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
- .get("delayInMilliSec")), map.get("entityType"),
- map.get("entityName"), map.get("instance"),
- Integer.parseInt(map.get("runId")), Integer.parseInt(map
- .get("attempts")), Integer.parseInt(map
- .get("failRetryCount")));
+ @SuppressWarnings("unchecked")
+ public T retryEventFromString(String line) {
+ Map<String, String> map = getMap(line);
+ return (T) new RetryEvent(map.get("clusterName"), map.get("wfId"),
+ Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map
+ .get("delayInMilliSec")), map.get("entityType"),
+ map.get("entityName"), map.get("instance"),
+ Integer.parseInt(map.get("runId")), Integer.parseInt(map
+ .get("attempts")), Integer.parseInt(map
+ .get("failRetryCount")));
- }
+ }
- private Map<String, String> getMap(String message) {
- String[] items = message.split("\\" + RerunEvent.SEP);
- Map<String, String> map = new HashMap<String, String>();
- for (String item : items) {
- String[] pair = item.split("=");
- map.put(pair[0], pair[1]);
- }
- return map;
- }
+ private Map<String, String> getMap(String message) {
+ String[] items = message.split("\\" + RerunEvent.SEP);
+ Map<String, String> map = new HashMap<String, String>();
+ for (String item : items) {
+ String[] pair = item.split("=");
+ map.put(pair[0], pair[1]);
+ }
+ return map;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 7ff4361..33248b8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -19,39 +19,39 @@ package org.apache.falcon.rerun.event;
public class RetryEvent extends RerunEvent {
- private int attempts;
- private int failRetryCount;
-
- public RetryEvent(String clusterName, String wfId, long msgInsertTime,
- long delay, String entityType, String entityName, String instance,
- int runId, int attempts, int failRetryCount) {
- super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
- instance, runId);
- this.attempts = attempts;
- this.failRetryCount = failRetryCount;
- }
-
- public int getAttempts() {
- return attempts;
- }
-
- public int getFailRetryCount() {
- return failRetryCount;
- }
-
- public void setFailRetryCount(int failRetryCount) {
- this.failRetryCount = failRetryCount;
- }
-
- @Override
- public String toString() {
-
- return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
- + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
- + delayInMilliSec + SEP + "entityType=" + entityType + SEP
- + "entityName=" + entityName + SEP + "instance=" + instance
- + SEP + "runId=" + runId + SEP + "attempts=" + attempts + SEP
- + "failRetryCount=" + failRetryCount;
- }
+ private int attempts;
+ private int failRetryCount;
+
+ public RetryEvent(String clusterName, String wfId, long msgInsertTime,
+ long delay, String entityType, String entityName, String instance,
+ int runId, int attempts, int failRetryCount) {
+ super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
+ instance, runId);
+ this.attempts = attempts;
+ this.failRetryCount = failRetryCount;
+ }
+
+ public int getAttempts() {
+ return attempts;
+ }
+
+ public int getFailRetryCount() {
+ return failRetryCount;
+ }
+
+ public void setFailRetryCount(int failRetryCount) {
+ this.failRetryCount = failRetryCount;
+ }
+
+ @Override
+ public String toString() {
+
+ return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
+ + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
+ + delayInMilliSec + SEP + "entityType=" + entityType + SEP
+ + "entityName=" + entityName + SEP + "instance=" + instance
+ + SEP + "runId=" + runId + SEP + "attempts=" + attempts + SEP
+ + "failRetryCount=" + failRetryCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 5e2fa50..fa1d9e3 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -27,47 +27,47 @@ import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.log4j.Logger;
public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends AbstractRerunHandler<T, DelayedQueue<T>>>
- implements Runnable {
+ implements Runnable {
- protected static final Logger LOG = Logger
- .getLogger(AbstractRerunConsumer.class);
+ protected static final Logger LOG = Logger
+ .getLogger(AbstractRerunConsumer.class);
- protected M handler;
+ protected M handler;
- public AbstractRerunConsumer(M handler) {
- this.handler = handler;
- }
+ public AbstractRerunConsumer(M handler) {
+ this.handler = handler;
+ }
- @Override
- public void run() {
- int attempt = 1;
- AbstractRerunPolicy policy = new ExpBackoffPolicy();
- Frequency frequency = new Frequency("minutes(1)");
- while (true) {
- try {
- T message = null;
- try {
- message = handler.takeFromQueue();
- attempt = 1;
- } catch (FalconException e) {
- LOG.error("Error while reading message from the queue: ", e);
- GenericAlert.alertRerunConsumerFailed(
- "Error while reading message from the queue: ", e);
- Thread.sleep(policy.getDelay(frequency, attempt));
- handler.reconnect();
- attempt++;
- continue;
- }
- String jobStatus = handler.getWfEngine().getWorkflowStatus(
- message.getClusterName(), message.getWfId());
- handleRerun(message.getClusterName(), jobStatus, message);
+ @Override
+ public void run() {
+ int attempt = 1;
+ AbstractRerunPolicy policy = new ExpBackoffPolicy();
+ Frequency frequency = new Frequency("minutes(1)");
+ while (true) {
+ try {
+ T message = null;
+ try {
+ message = handler.takeFromQueue();
+ attempt = 1;
+ } catch (FalconException e) {
+ LOG.error("Error while reading message from the queue: ", e);
+ GenericAlert.alertRerunConsumerFailed(
+ "Error while reading message from the queue: ", e);
+ Thread.sleep(policy.getDelay(frequency, attempt));
+ handler.reconnect();
+ attempt++;
+ continue;
+ }
+ String jobStatus = handler.getWfEngine().getWorkflowStatus(
+ message.getClusterName(), message.getWfId());
+ handleRerun(message.getClusterName(), jobStatus, message);
- } catch (Throwable e) {
- LOG.error("Error in rerun consumer:", e);
- }
- }
+ } catch (Throwable e) {
+ LOG.error("Error in rerun consumer:", e);
+ }
+ }
- }
+ }
- protected abstract void handleRerun(String cluster, String jobStatus, T message);
+ protected abstract void handleRerun(String cluster, String jobStatus, T message);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 66f9c2a..4a90b9f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -29,44 +29,44 @@ import org.apache.log4j.Logger;
public abstract class AbstractRerunHandler<T extends RerunEvent, M extends DelayedQueue<T>> {
- protected static final Logger LOG = Logger
- .getLogger(LateRerunHandler.class);
- protected M delayQueue;
- private AbstractWorkflowEngine wfEngine;
+ protected static final Logger LOG = Logger
+ .getLogger(LateRerunHandler.class);
+ protected M delayQueue;
+ private AbstractWorkflowEngine wfEngine;
- public void init(M delayQueue) throws FalconException {
- this.wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- this.delayQueue = delayQueue;
- this.delayQueue.init();
- }
+ public void init(M delayQueue) throws FalconException {
+ this.wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+ this.delayQueue = delayQueue;
+ this.delayQueue.init();
+ }
- public abstract void handleRerun(String cluster, String entityType,
- String entityName, String nominalTime, String runId, String wfId,
- long msgReceivedTime);
+ public abstract void handleRerun(String cluster, String entityType,
+ String entityName, String nominalTime, String runId, String wfId,
+ long msgReceivedTime);
- public AbstractWorkflowEngine getWfEngine() {
- return wfEngine;
- }
+ public AbstractWorkflowEngine getWfEngine() {
+ return wfEngine;
+ }
- public boolean offerToQueue(T event) throws FalconException {
- return delayQueue.offer(event);
- }
+ public boolean offerToQueue(T event) throws FalconException {
+ return delayQueue.offer(event);
+ }
- public T takeFromQueue() throws FalconException {
- return delayQueue.take();
- }
-
- public void reconnect() throws FalconException {
- delayQueue.reconnect();
- }
+ public T takeFromQueue() throws FalconException {
+ return delayQueue.take();
+ }
- public Entity getEntity(String entityType, String entityName)
- throws FalconException {
- return EntityUtil.getEntity(entityType, entityName);
- }
+ public void reconnect() throws FalconException {
+ delayQueue.reconnect();
+ }
- public Retry getRetry(Entity entity) throws FalconException {
- return EntityUtil.getRetry(entity);
- }
+ public Entity getEntity(String entityType, String entityName)
+ throws FalconException {
+ return EntityUtil.getEntity(entityType, entityName);
+ }
+
+ public Retry getRetry(Entity entity) throws FalconException {
+ return EntityUtil.getRetry(entity);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index fc88f0e..03561fc 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -17,16 +17,6 @@
*/
package org.apache.falcon.rerun.handler;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
@@ -35,115 +25,120 @@ import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.latedata.LateDataHandler;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
-import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.util.*;
public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEvent>>>
- extends AbstractRerunConsumer<LaterunEvent, T> {
+ extends AbstractRerunConsumer<LaterunEvent, T> {
- public LateRerunConsumer(T handler) {
- super(handler);
- }
+ public LateRerunConsumer(T handler) {
+ super(handler);
+ }
- @Override
- protected void handleRerun(String cluster, String jobStatus,
- LaterunEvent message) {
- try {
- if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
- || jobStatus.equals("SUSPENDED")) {
- LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as job status is running:"
- + message.getWfId());
- message.setMsgInsertTime(System.currentTimeMillis());
- handler.offerToQueue(message);
- return;
- }
+ @Override
+ protected void handleRerun(String cluster, String jobStatus,
+ LaterunEvent message) {
+ try {
+ if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
+ || jobStatus.equals("SUSPENDED")) {
+ LOG.debug(
+ "Re-enqueing message in LateRerunHandler for workflow with same delay as job status is running:"
+ + message.getWfId());
+ message.setMsgInsertTime(System.currentTimeMillis());
+ handler.offerToQueue(message);
+ return;
+ }
- String detectLate = detectLate(message);
+ String detectLate = detectLate(message);
- if (detectLate.equals("")) {
- LOG.debug("No Late Data Detected, scheduling next late rerun for wf-id: "
- + message.getWfId()
- + " at "
- + SchemaHelper.formatDateUTC(new Date()));
- handler.handleRerun(cluster, message.getEntityType(),
- message.getEntityName(), message.getInstance(),
- Integer.toString(message.getRunId()),
- message.getWfId(), System.currentTimeMillis());
- return;
- }
+ if (detectLate.equals("")) {
+ LOG.debug("No Late Data Detected, scheduling next late rerun for wf-id: "
+ + message.getWfId()
+ + " at "
+ + SchemaHelper.formatDateUTC(new Date()));
+ handler.handleRerun(cluster, message.getEntityType(),
+ message.getEntityName(), message.getInstance(),
+ Integer.toString(message.getRunId()),
+ message.getWfId(), System.currentTimeMillis());
+ return;
+ }
- LOG.info("Late changes detected in the following feeds: "
- + detectLate);
+ LOG.info("Late changes detected in the following feeds: "
+ + detectLate);
- handler.getWfEngine().reRun(message.getClusterName(),
- message.getWfId(), null);
- LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
- + " on cluster: " + message.getClusterName());
- } catch (Exception e) {
- LOG.warn(
- "Late Re-run failed for instance "
- + message.getEntityName() + ":"
- + message.getInstance() + " after "
- + message.getDelayInMilliSec() + " with message:",
- e);
- GenericAlert.alertLateRerunFailed(message.getEntityType(),
- message.getEntityName(), message.getInstance(),
- message.getWfId(), Integer.toString(message.getRunId()),
- e.getMessage());
- }
+ handler.getWfEngine().reRun(message.getClusterName(),
+ message.getWfId(), null);
+ LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
+ + " on cluster: " + message.getClusterName());
+ } catch (Exception e) {
+ LOG.warn(
+ "Late Re-run failed for instance "
+ + message.getEntityName() + ":"
+ + message.getInstance() + " after "
+ + message.getDelayInMilliSec() + " with message:",
+ e);
+ GenericAlert.alertLateRerunFailed(message.getEntityType(),
+ message.getEntityName(), message.getInstance(),
+ message.getWfId(), Integer.toString(message.getRunId()),
+ e.getMessage());
+ }
- }
+ }
- public String detectLate(LaterunEvent message) throws Exception {
- LateDataHandler late = new LateDataHandler();
- String falconInputFeeds = handler.getWfEngine().getWorkflowProperty(
- message.getClusterName(), message.getWfId(), "falconInputFeeds");
- String logDir = handler.getWfEngine().getWorkflowProperty(
- message.getClusterName(), message.getWfId(), "logDir");
- String falconInPaths = handler.getWfEngine().getWorkflowProperty(
- message.getClusterName(), message.getWfId(), "falconInPaths");
- String nominalTime = handler.getWfEngine().getWorkflowProperty(
- message.getClusterName(), message.getWfId(), "nominalTime");
- String srcClusterName = handler.getWfEngine().getWorkflowProperty(
- message.getClusterName(), message.getWfId(), "srcClusterName");
+ public String detectLate(LaterunEvent message) throws Exception {
+ LateDataHandler late = new LateDataHandler();
+ String falconInputFeeds = handler.getWfEngine().getWorkflowProperty(
+ message.getClusterName(), message.getWfId(), "falconInputFeeds");
+ String logDir = handler.getWfEngine().getWorkflowProperty(
+ message.getClusterName(), message.getWfId(), "logDir");
+ String falconInPaths = handler.getWfEngine().getWorkflowProperty(
+ message.getClusterName(), message.getWfId(), "falconInPaths");
+ String nominalTime = handler.getWfEngine().getWorkflowProperty(
+ message.getClusterName(), message.getWfId(), "nominalTime");
+ String srcClusterName = handler.getWfEngine().getWorkflowProperty(
+ message.getClusterName(), message.getWfId(), "srcClusterName");
- Configuration conf = handler.getConfiguration(message.getClusterName(),
- message.getWfId());
- Path lateLogPath = handler.getLateLogPath(logDir, nominalTime,
- srcClusterName);
- FileSystem fs = FileSystem.get(conf);
- if (!fs.exists(lateLogPath)) {
- LOG.warn("Late log file:" + lateLogPath + " not found:");
- return "";
- }
- Map<String, Long> feedSizes = new LinkedHashMap<String, Long>();
- String[] pathGroups = falconInPaths.split("#");
- String[] inputFeeds = falconInputFeeds.split("#");
- Entity entity = EntityUtil.getEntity(message.getEntityType(),
- message.getEntityName());
+ Configuration conf = handler.getConfiguration(message.getClusterName(),
+ message.getWfId());
+ Path lateLogPath = handler.getLateLogPath(logDir, nominalTime,
+ srcClusterName);
+ FileSystem fs = FileSystem.get(conf);
+ if (!fs.exists(lateLogPath)) {
+ LOG.warn("Late log file:" + lateLogPath + " not found:");
+ return "";
+ }
+ Map<String, Long> feedSizes = new LinkedHashMap<String, Long>();
+ String[] pathGroups = falconInPaths.split("#");
+ String[] inputFeeds = falconInputFeeds.split("#");
+ Entity entity = EntityUtil.getEntity(message.getEntityType(),
+ message.getEntityName());
- List<String> lateFeed = new ArrayList<String>();
- if (EntityUtil.getLateProcess(entity) != null) {
- for (LateInput li : EntityUtil.getLateProcess(entity)
- .getLateInputs()) {
- lateFeed.add(li.getInput());
- }
- for (int index = 0; index < pathGroups.length; index++) {
- if (lateFeed.contains(inputFeeds[index])) {
- long usage = 0;
- for (String pathElement : pathGroups[index].split(",")) {
- Path inPath = new Path(pathElement);
- usage += late.usage(inPath, conf);
- }
- feedSizes.put(inputFeeds[index], usage);
- }
- }
- } else {
- LOG.warn("Late process is not configured for entity: "
- + message.getEntityType() + "(" + message.getEntityName()
- + ")");
- }
+ List<String> lateFeed = new ArrayList<String>();
+ if (EntityUtil.getLateProcess(entity) != null) {
+ for (LateInput li : EntityUtil.getLateProcess(entity)
+ .getLateInputs()) {
+ lateFeed.add(li.getInput());
+ }
+ for (int index = 0; index < pathGroups.length; index++) {
+ if (lateFeed.contains(inputFeeds[index])) {
+ long usage = 0;
+ for (String pathElement : pathGroups[index].split(",")) {
+ Path inPath = new Path(pathElement);
+ usage += late.usage(inPath, conf);
+ }
+ feedSizes.put(inputFeeds[index], usage);
+ }
+ }
+ } else {
+ LOG.warn("Late process is not configured for entity: "
+ + message.getEntityType() + "(" + message.getEntityName()
+ + ")");
+ }
- return late.detectChanges(lateLogPath, feedSizes, conf);
- }
+ return late.detectChanges(lateLogPath, feedSizes, conf);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index e2145cb..ad19157 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -17,12 +17,6 @@
*/
package org.apache.falcon.rerun.handler;
-import java.util.Date;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityUtil;
@@ -31,10 +25,7 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.*;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.rerun.event.LaterunEvent;
@@ -42,185 +33,192 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.RerunPolicyFactory;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Date;
public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
- AbstractRerunHandler<LaterunEvent, M> {
-
- @Override
- public void handleRerun(String cluster, String entityType,
- String entityName, String nominalTime, String runId, String wfId,
- long msgReceivedTime) {
-
- try {
- Entity entity = EntityUtil.getEntity(entityType, entityName);
- try {
- if (EntityUtil.getLateProcess(entity) == null
- || EntityUtil.getLateProcess(entity).getLateInputs() == null
- || EntityUtil.getLateProcess(entity).getLateInputs()
- .size() == 0) {
- LOG.info("Late rerun not configured for entity: " + entityName);
- return;
- }
- } catch (FalconException e) {
- LOG.error("Unable to get Late Process for entity:" + entityName);
- return;
- }
- int intRunId = Integer.parseInt(runId);
- Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
- Long wait = getEventDelay(entity, nominalTime);
- if (wait == -1) {
- LOG.info("Late rerun expired for entity: "+entityType+"("+entityName+")");
- String logDir = this.getWfEngine().getWorkflowProperty(cluster,
- wfId, "logDir");
- String srcClusterName = this.getWfEngine().getWorkflowProperty(
- cluster, wfId, "srcClusterName");
- Path lateLogPath = this.getLateLogPath(logDir,
- EntityUtil.UTCtoURIDate(nominalTime), srcClusterName);
- LOG.info("Going to delete path:" +lateLogPath);
- FileSystem fs = FileSystem.get(getConfiguration(cluster,
- wfId));
- if (fs.exists(lateLogPath)) {
- boolean deleted = fs.delete(lateLogPath, true);
- if (deleted == true) {
- LOG.info("Successfully deleted late file path:"
- + lateLogPath);
- }
- }
- return;
- }
-
- LOG.debug("Scheduling the late rerun for entity instance : "
- + entityType + "(" + entityName + ")" + ":" + nominalTime
- + " And WorkflowId: " + wfId);
- LaterunEvent event = new LaterunEvent(cluster, wfId,
- msgInsertTime.getTime(), wait, entityType, entityName,
- nominalTime, intRunId);
- offerToQueue(event);
- } catch (Exception e) {
- LOG.error("Unable to schedule late rerun for entity instance : "
- + entityType + "(" + entityName + ")" + ":" + nominalTime
- + " And WorkflowId: " + wfId, e);
- GenericAlert.alertLateRerunFailed(entityType, entityName,
- nominalTime, wfId, runId, e.getMessage());
- }
- }
-
- private long getEventDelay(Entity entity, String nominalTime)
- throws FalconException {
-
- Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
- LateProcess lateProcess = EntityUtil.getLateProcess(entity);
- if (lateProcess == null) {
- LOG.warn("Late run not applicable for entity:"
- + entity.getEntityType() + "(" + entity.getName() + ")");
- return -1;
- }
- PolicyType latePolicy = lateProcess.getPolicy();
- Date cutOffTime = getCutOffTime(entity, nominalTime);
- Date now = new Date();
- Long wait = null;
-
- if (now.after(cutOffTime)) {
- LOG.warn("Feed Cut Off time: "
- + SchemaHelper.formatDateUTC(cutOffTime)
- + " has expired, Late Rerun can not be scheduled");
- return -1;
- } else {
- AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
- .getRetryPolicy(latePolicy);
- wait = rerunPolicy.getDelay(lateProcess.getDelay(), instanceDate,
- cutOffTime);
- }
- return wait;
- }
-
- public static Date addTime(Date date, int milliSecondsToAdd) {
- return new Date(date.getTime() + milliSecondsToAdd);
- }
-
- public static Date getCutOffTime(Entity entity, String nominalTime)
- throws FalconException {
-
- ConfigurationStore store = ConfigurationStore.get();
- ExpressionHelper evaluator = ExpressionHelper.get();
- Date instanceStart = EntityUtil.parseDateUTC(nominalTime);
- ExpressionHelper.setReferenceDate(instanceStart);
- Date endTime = new Date();
- Date feedCutOff = new Date(0);
- if (entity.getEntityType() == EntityType.FEED) {
- if (((Feed) entity).getLateArrival() == null) {
- LOG.debug("Feed's " + entity.getName()
- + " late arrival cut-off is not configured, returning");
- return feedCutOff;
- }
- String lateCutOff = ((Feed) entity).getLateArrival().getCutOff()
- .toString();
- endTime = EntityUtil.parseDateUTC(nominalTime);
- long feedCutOffPeriod = evaluator.evaluate(lateCutOff, Long.class);
- endTime = addTime(endTime, (int) feedCutOffPeriod);
- return endTime;
- } else if (entity.getEntityType() == EntityType.PROCESS) {
- Process process = (Process) entity;
- for (LateInput lp : process.getLateProcess().getLateInputs()) {
- Feed feed = null;
- String endInstanceTime = "";
- for (Input input : process.getInputs().getInputs()) {
- if (input.getName().equals(lp.getInput())) {
- endInstanceTime = input.getEnd();
- feed = store.get(EntityType.FEED, input.getFeed());
- break;
- }
- }
- if (feed.getLateArrival() == null) {
- LOG.debug("Feed's " + feed.getName()
- + " late arrival cut-off is not configured, ignoring this feed");
- continue;
- }
- String lateCutOff = feed.getLateArrival().getCutOff()
- .toString();
- endTime = evaluator.evaluate(endInstanceTime, Date.class);
- long feedCutOffPeriod = evaluator.evaluate(lateCutOff,
- Long.class);
- endTime = addTime(endTime, (int) feedCutOffPeriod);
-
- if (endTime.after(feedCutOff))
- feedCutOff = endTime;
- }
- return feedCutOff;
- } else {
- throw new FalconException(
- "Invalid entity while getting cut-off time:"
- + entity.getName());
- }
- }
-
- @Override
- public void init(M delayQueue) throws FalconException {
- super.init(delayQueue);
- Thread daemon = new Thread(new LateRerunConsumer(this));
- daemon.setName("LaterunHandler");
- daemon.setDaemon(true);
- daemon.start();
- LOG.info("Laterun Handler thread started");
- }
-
- public Path getLateLogPath(String logDir, String nominalTime,
- String srcClusterName) {
- //SrcClusterName valid only in case of feed
- return new Path(logDir + "/latedata/" + nominalTime + "/"
- + (srcClusterName == null
- ? "" : srcClusterName));
-
- }
-
- public Configuration getConfiguration(String cluster, String wfId)
- throws FalconException {
- Configuration conf = new Configuration();
- conf.set(
- CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
- this.getWfEngine().getWorkflowProperty(cluster, wfId,
- AbstractWorkflowEngine.NAME_NODE));
- return conf;
- }
+ AbstractRerunHandler<LaterunEvent, M> {
+
+ @Override
+ public void handleRerun(String cluster, String entityType,
+ String entityName, String nominalTime, String runId, String wfId,
+ long msgReceivedTime) {
+
+ try {
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+ try {
+ if (EntityUtil.getLateProcess(entity) == null
+ || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ || EntityUtil.getLateProcess(entity).getLateInputs()
+ .size() == 0) {
+ LOG.info("Late rerun not configured for entity: " + entityName);
+ return;
+ }
+ } catch (FalconException e) {
+ LOG.error("Unable to get Late Process for entity:" + entityName);
+ return;
+ }
+ int intRunId = Integer.parseInt(runId);
+ Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
+ Long wait = getEventDelay(entity, nominalTime);
+ if (wait == -1) {
+ LOG.info("Late rerun expired for entity: " + entityType + "(" + entityName + ")");
+ String logDir = this.getWfEngine().getWorkflowProperty(cluster,
+ wfId, "logDir");
+ String srcClusterName = this.getWfEngine().getWorkflowProperty(
+ cluster, wfId, "srcClusterName");
+ Path lateLogPath = this.getLateLogPath(logDir,
+ EntityUtil.UTCtoURIDate(nominalTime), srcClusterName);
+ LOG.info("Going to delete path:" + lateLogPath);
+ FileSystem fs = FileSystem.get(getConfiguration(cluster,
+ wfId));
+ if (fs.exists(lateLogPath)) {
+ boolean deleted = fs.delete(lateLogPath, true);
+ if (deleted == true) {
+ LOG.info("Successfully deleted late file path:"
+ + lateLogPath);
+ }
+ }
+ return;
+ }
+
+ LOG.debug("Scheduling the late rerun for entity instance : "
+ + entityType + "(" + entityName + ")" + ":" + nominalTime
+ + " And WorkflowId: " + wfId);
+ LaterunEvent event = new LaterunEvent(cluster, wfId,
+ msgInsertTime.getTime(), wait, entityType, entityName,
+ nominalTime, intRunId);
+ offerToQueue(event);
+ } catch (Exception e) {
+ LOG.error("Unable to schedule late rerun for entity instance : "
+ + entityType + "(" + entityName + ")" + ":" + nominalTime
+ + " And WorkflowId: " + wfId, e);
+ GenericAlert.alertLateRerunFailed(entityType, entityName,
+ nominalTime, wfId, runId, e.getMessage());
+ }
+ }
+
+ private long getEventDelay(Entity entity, String nominalTime)
+ throws FalconException {
+
+ Date instanceDate = EntityUtil.parseDateUTC(nominalTime);
+ LateProcess lateProcess = EntityUtil.getLateProcess(entity);
+ if (lateProcess == null) {
+ LOG.warn("Late run not applicable for entity:"
+ + entity.getEntityType() + "(" + entity.getName() + ")");
+ return -1;
+ }
+ PolicyType latePolicy = lateProcess.getPolicy();
+ Date cutOffTime = getCutOffTime(entity, nominalTime);
+ Date now = new Date();
+ Long wait = null;
+
+ if (now.after(cutOffTime)) {
+ LOG.warn("Feed Cut Off time: "
+ + SchemaHelper.formatDateUTC(cutOffTime)
+ + " has expired, Late Rerun can not be scheduled");
+ return -1;
+ } else {
+ AbstractRerunPolicy rerunPolicy = RerunPolicyFactory
+ .getRetryPolicy(latePolicy);
+ wait = rerunPolicy.getDelay(lateProcess.getDelay(), instanceDate,
+ cutOffTime);
+ }
+ return wait;
+ }
+
+ public static Date addTime(Date date, int milliSecondsToAdd) {
+ return new Date(date.getTime() + milliSecondsToAdd);
+ }
+
+ public static Date getCutOffTime(Entity entity, String nominalTime)
+ throws FalconException {
+
+ ConfigurationStore store = ConfigurationStore.get();
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ Date instanceStart = EntityUtil.parseDateUTC(nominalTime);
+ ExpressionHelper.setReferenceDate(instanceStart);
+ Date endTime = new Date();
+ Date feedCutOff = new Date(0);
+ if (entity.getEntityType() == EntityType.FEED) {
+ if (((Feed) entity).getLateArrival() == null) {
+ LOG.debug("Feed's " + entity.getName()
+ + " late arrival cut-off is not configured, returning");
+ return feedCutOff;
+ }
+ String lateCutOff = ((Feed) entity).getLateArrival().getCutOff()
+ .toString();
+ endTime = EntityUtil.parseDateUTC(nominalTime);
+ long feedCutOffPeriod = evaluator.evaluate(lateCutOff, Long.class);
+ endTime = addTime(endTime, (int) feedCutOffPeriod);
+ return endTime;
+ } else if (entity.getEntityType() == EntityType.PROCESS) {
+ Process process = (Process) entity;
+ for (LateInput lp : process.getLateProcess().getLateInputs()) {
+ Feed feed = null;
+ String endInstanceTime = "";
+ for (Input input : process.getInputs().getInputs()) {
+ if (input.getName().equals(lp.getInput())) {
+ endInstanceTime = input.getEnd();
+ feed = store.get(EntityType.FEED, input.getFeed());
+ break;
+ }
+ }
+ if (feed.getLateArrival() == null) {
+ LOG.debug("Feed's " + feed.getName()
+ + " late arrival cut-off is not configured, ignoring this feed");
+ continue;
+ }
+ String lateCutOff = feed.getLateArrival().getCutOff()
+ .toString();
+ endTime = evaluator.evaluate(endInstanceTime, Date.class);
+ long feedCutOffPeriod = evaluator.evaluate(lateCutOff,
+ Long.class);
+ endTime = addTime(endTime, (int) feedCutOffPeriod);
+
+ if (endTime.after(feedCutOff)) {
+ feedCutOff = endTime;
+ }
+ }
+ return feedCutOff;
+ } else {
+ throw new FalconException(
+ "Invalid entity while getting cut-off time:"
+ + entity.getName());
+ }
+ }
+
+ @Override
+ public void init(M delayQueue) throws FalconException {
+ super.init(delayQueue);
+ Thread daemon = new Thread(new LateRerunConsumer(this));
+ daemon.setName("LaterunHandler");
+ daemon.setDaemon(true);
+ daemon.start();
+ LOG.info("Laterun Handler thread started");
+ }
+
+ public Path getLateLogPath(String logDir, String nominalTime,
+ String srcClusterName) {
+ //SrcClusterName valid only in case of feed
+ return new Path(logDir + "/latedata/" + nominalTime + "/"
+ + (srcClusterName == null
+ ? "" : srcClusterName));
+
+ }
+
+ public Configuration getConfiguration(String cluster, String wfId)
+ throws FalconException {
+ Configuration conf = new Configuration();
+ conf.set(
+ CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
+ this.getWfEngine().getWorkflowProperty(cluster, wfId,
+ AbstractWorkflowEngine.NAME_NODE));
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
index 1901890..ce76842 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RerunHandlerFactory.java
@@ -17,29 +17,31 @@
*/
package org.apache.falcon.rerun.handler;
-import org.apache.falcon.rerun.event.RerunEvent.RerunType;
import org.apache.falcon.rerun.event.LaterunEvent;
+import org.apache.falcon.rerun.event.RerunEvent.RerunType;
import org.apache.falcon.rerun.event.RetryEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
public class RerunHandlerFactory {
- private static final RetryHandler<DelayedQueue<RetryEvent>> retryHandler = new RetryHandler<DelayedQueue<RetryEvent>>();
- private static final LateRerunHandler<DelayedQueue<LaterunEvent>> lateHandler = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
+ private static final RetryHandler<DelayedQueue<RetryEvent>> retryHandler
+ = new RetryHandler<DelayedQueue<RetryEvent>>();
+ private static final LateRerunHandler<DelayedQueue<LaterunEvent>> lateHandler
+ = new LateRerunHandler<DelayedQueue<LaterunEvent>>();
- private RerunHandlerFactory() {
+ private RerunHandlerFactory() {
- }
+ }
- public static AbstractRerunHandler getRerunHandler(RerunType type) {
- switch (type) {
- case RETRY:
- return retryHandler;
- case LATE:
- return lateHandler;
- default:
- throw new RuntimeException("Invalid handler:" + type);
- }
+ public static AbstractRerunHandler getRerunHandler(RerunType type) {
+ switch (type) {
+ case RETRY:
+ return retryHandler;
+ case LATE:
+ return lateHandler;
+ default:
+ throw new RuntimeException("Invalid handler:" + type);
+ }
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index a30d2da..c084233 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -17,82 +17,82 @@
*/
package org.apache.falcon.rerun.handler;
-import java.util.Date;
-
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.rerun.event.RetryEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.util.StartupProperties;
+import java.util.Date;
+
public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
- extends AbstractRerunConsumer<RetryEvent, T> {
+ extends AbstractRerunConsumer<RetryEvent, T> {
- public RetryConsumer(T handler) {
- super(handler);
- }
+ public RetryConsumer(T handler) {
+ super(handler);
+ }
- @Override
- protected void handleRerun(String cluster, String jobStatus,
- RetryEvent message) {
- try {
- if (!jobStatus.equals("KILLED")) {
- LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
- + message.getWfId());
- message.setMsgInsertTime(System.currentTimeMillis());
- handler.offerToQueue(message);
- return;
- }
- LOG.info("Retrying attempt:"
- + (message.getRunId() + 1)
- + " out of configured: "
- + message.getAttempts()
- + " attempt for instance::"
- + message.getEntityName()
- + ":"
- + message.getInstance()
- + " And WorkflowId: "
- + message.getWfId()
- + " At time: "
- + SchemaHelper.formatDateUTC(new Date(System
- .currentTimeMillis())));
- handler.getWfEngine().reRun(message.getClusterName(),
- message.getWfId(), null);
- } catch (Exception e) {
- int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
- .getProperty("max.retry.failure.count", "1"));
- if (message.getFailRetryCount() < maxFailRetryCount) {
- LOG.warn(
- "Retrying again for process instance "
- + message.getEntityName() + ":"
- + message.getInstance() + " after "
- + message.getDelayInMilliSec()
- + " seconds as Retry failed with message:", e);
- message.setFailRetryCount(message.getFailRetryCount() + 1);
- try {
- handler.offerToQueue(message);
- } catch (Exception ex) {
- LOG.error("Unable to re-offer to queue:", ex);
- GenericAlert.alertRetryFailed(message.getEntityType(),
- message.getEntityName(), message.getInstance(),
- message.getWfId(),
- Integer.toString(message.getRunId()),
- ex.getMessage());
- }
- } else {
- LOG.warn(
- "Failure retry attempts exhausted for instance: "
- + message.getEntityName() + ":"
- + message.getInstance(), e);
- GenericAlert.alertRetryFailed(message.getEntityType(),
- message.getEntityName(), message.getInstance(),
- message.getWfId(),
- Integer.toString(message.getRunId()),
- "Failure retry attempts exhausted");
- }
+ @Override
+ protected void handleRerun(String cluster, String jobStatus,
+ RetryEvent message) {
+ try {
+ if (!jobStatus.equals("KILLED")) {
+ LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
+ + message.getWfId());
+ message.setMsgInsertTime(System.currentTimeMillis());
+ handler.offerToQueue(message);
+ return;
+ }
+ LOG.info("Retrying attempt:"
+ + (message.getRunId() + 1)
+ + " out of configured: "
+ + message.getAttempts()
+ + " attempt for instance::"
+ + message.getEntityName()
+ + ":"
+ + message.getInstance()
+ + " And WorkflowId: "
+ + message.getWfId()
+ + " At time: "
+ + SchemaHelper.formatDateUTC(new Date(System
+ .currentTimeMillis())));
+ handler.getWfEngine().reRun(message.getClusterName(),
+ message.getWfId(), null);
+ } catch (Exception e) {
+ int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
+ .getProperty("max.retry.failure.count", "1"));
+ if (message.getFailRetryCount() < maxFailRetryCount) {
+ LOG.warn(
+ "Retrying again for process instance "
+ + message.getEntityName() + ":"
+ + message.getInstance() + " after "
+ + message.getDelayInMilliSec()
+ + " seconds as Retry failed with message:", e);
+ message.setFailRetryCount(message.getFailRetryCount() + 1);
+ try {
+ handler.offerToQueue(message);
+ } catch (Exception ex) {
+ LOG.error("Unable to re-offer to queue:", ex);
+ GenericAlert.alertRetryFailed(message.getEntityType(),
+ message.getEntityName(), message.getInstance(),
+ message.getWfId(),
+ Integer.toString(message.getRunId()),
+ ex.getMessage());
+ }
+ } else {
+ LOG.warn(
+ "Failure retry attempts exhausted for instance: "
+ + message.getEntityName() + ":"
+ + message.getInstance(), e);
+ GenericAlert.alertRetryFailed(message.getEntityType(),
+ message.getEntityName(), message.getInstance(),
+ message.getWfId(),
+ Integer.toString(message.getRunId()),
+ "Failure retry attempts exhausted");
+ }
- }
+ }
- }
+ }
}