You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by so...@apache.org on 2015/10/13 01:31:14 UTC
falcon git commit: FALCON-1102 Gather data transfer details of
filesystem replication. Contributed by Peeyush Bishnoi.
Repository: falcon
Updated Branches:
refs/heads/master 08fbf4f38 -> d81820082
FALCON-1102 Gather data transfer details of filesystem replication. Contributed by Peeyush Bishnoi.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d8182008
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d8182008
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d8182008
Branch: refs/heads/master
Commit: d8182008269050e926b99c7111817d1e327c0dfd
Parents: 08fbf4f
Author: Sowmya Ramesh <sr...@hortonworks.com>
Authored: Mon Oct 12 16:31:04 2015 -0700
Committer: Sowmya Ramesh <sr...@hortonworks.com>
Committed: Mon Oct 12 16:31:04 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../resources/hdfs-replication-workflow.xml | 2 +
.../InstanceRelationshipGraphBuilder.java | 30 +++++++
.../falcon/workflow/WorkflowExecutionArgs.java | 3 +-
.../workflow/WorkflowExecutionContext.java | 64 ++++++++++++++
.../metadata/MetadataMappingServiceTest.java | 67 +++++++++++++-
metrics/pom.xml | 27 ++++++
.../falcon/job/FSReplicationCounters.java | 44 ++++++++++
.../java/org/apache/falcon/job/JobCounters.java | 92 ++++++++++++++++++++
.../apache/falcon/job/JobCountersHandler.java | 41 +++++++++
.../java/org/apache/falcon/job/JobType.java | 26 ++++++
.../falcon/job/ReplicationJobCountersList.java | 61 +++++++++++++
.../falcon/job/FSReplicationCountersTest.java | 52 +++++++++++
.../feed/FSReplicationWorkflowBuilder.java | 1 +
.../feed/FeedReplicationWorkflowBuilder.java | 25 ++++++
.../feed/OozieFeedWorkflowBuilderTest.java | 23 ++++-
.../feed/fs-replication-feed-counters.xml | 59 +++++++++++++
replication/pom.xml | 4 +
.../falcon/replication/FeedReplicator.java | 22 ++++-
19 files changed, 639 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff8c40e..e6c8f28 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao)
NEW FEATURES
+ FALCON-1102 Gather data transfer details of filesystem replication(Peeyush Bishnoi via Sowmya Ramesh)
+
FALCON-1316 Add supporting REST API calls for new UI(Balu Vellanki via Sowmya Ramesh)
FALCON-1473 Feed SLA Miss Alerts through REST API(Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
index 942421f..c1966be 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
@@ -67,6 +67,8 @@
<arg>FILESYSTEM</arg>
<arg>-availabilityFlag</arg>
<arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
+ <arg>-counterLogDir</arg>
+ <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg>
</java>
<ok to="end"/>
<error to="fail"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 016c622..f485764 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -91,9 +91,25 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
addPipelines(process.getPipelines(), processInstance);
}
+ addCounters(processInstance, context);
+
return processInstance;
}
+ private void addCounters(Vertex processInstance, WorkflowExecutionContext context) throws FalconException {
+ String counterString = getCounterString(context);
+ if (!StringUtils.isBlank(counterString)) {
+ addCountersToInstance(counterString, processInstance);
+ }
+ }
+
+ private String getCounterString(WorkflowExecutionContext context) {
+ if (!StringUtils.isBlank(context.getCounters())) {
+ return context.getCounters();
+ }
+ return null;
+ }
+
public String getProcessInstanceName(WorkflowExecutionContext context) {
return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
}
@@ -118,6 +134,18 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
vertex.setProperty(optionName.getName(), value);
}
+ private void addCountersToInstance(String counterString, Vertex vertex) throws FalconException {
+ String[] counterKeyValues = counterString.split(",");
+ try {
+ for (String counter : counterKeyValues) {
+ String[] keyVals = counter.split(":", 2);
+ vertex.setProperty(keyVals[0], Long.parseLong(keyVals[1]));
+ }
+ } catch (NumberFormatException e) {
+ throw new FalconException("Invalid values for counter:" + e);
+ }
+ }
+
public void addInstanceToEntity(Vertex instanceVertex, String entityName,
RelationshipType entityType, RelationshipLabel edgeLabel) {
addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null);
@@ -200,6 +228,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
+
+ addCounters(feedInstanceVertex, context);
}
public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index d2430a2..ac7140c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -84,7 +84,8 @@ public enum WorkflowExecutionArgs {
LOG_DIR("logDir", "log dir where lineage can be recorded"),
CONTEXT_FILE("contextFile", "wf execution context file path where wf properties are recorded", false),
- CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false);
+ CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false),
+ COUNTERS("counters", "store job counters", false);
private final String name;
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 45b6d23..b870e3a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
@@ -314,6 +315,10 @@ public class WorkflowExecutionContext {
return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE));
}
+ public String getCounters() {
+ return getValue(WorkflowExecutionArgs.COUNTERS);
+ }
+
/**
* this method is invoked from with in the workflow.
*
@@ -383,6 +388,33 @@ public class WorkflowExecutionContext {
}
+ public static Path getCounterFile(String logDir) {
+ return new Path(logDir, "counter.txt");
+ }
+
+ public static String readCounters(FileSystem fs, Path counterFile) throws IOException{
+ StringBuilder counterBuffer = new StringBuilder();
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(counterFile)));
+ try {
+ String line;
+ while ((line = in.readLine()) != null) {
+ counterBuffer.append(line);
+ counterBuffer.append(",");
+ }
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+
+ String counterString = counterBuffer.toString();
+ if (StringUtils.isNotBlank(counterString) && counterString.length() > 0) {
+ return counterString.substring(0, counterString.length() - 1);
+ } else {
+ return null;
+ }
+ }
+
public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
return create(args, type, null);
}
@@ -408,10 +440,42 @@ public class WorkflowExecutionContext {
executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),
executionContext.getEntityType(), executionContext.getOperation()));
+ addCounterToWF(executionContext);
return executionContext;
}
+ private static void addCounterToWF(WorkflowExecutionContext executionContext) throws FalconException {
+ if (executionContext.hasWorkflowFailed()) {
+ LOG.info("Workflow Instance failed, counter will not be added: {}",
+ executionContext.getWorkflowRunIdString());
+ return;
+ }
+
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+ new Path(executionContext.getLogDir()).toUri());
+ Path counterFile = getCounterFile(executionContext.getLogDir());
+ try {
+ if (fs.exists(counterFile)) {
+ String counters = readCounters(fs, counterFile);
+ if (StringUtils.isNotBlank(counters)) {
+ executionContext.context.put(WorkflowExecutionArgs.COUNTERS, counters);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Error in accessing counter file :" + e);
+ } finally {
+ try {
+ if (fs.exists(counterFile)) {
+ fs.delete(counterFile, false);
+ }
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Unable to delete counter file: {}", e);
+ }
+ }
+ }
+
private static CommandLine getCommand(String[] arguments) throws ParseException {
Options options = new Options();
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 89e8178..29f933d 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -41,20 +41,23 @@ import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
-import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -62,13 +65,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
+
/**
* Test for Metadata relationship mapping service.
*/
public class MetadataMappingServiceTest {
public static final String FALCON_USER = "falcon-user";
- private static final String LOGS_DIR = "/falcon/staging/feed/logs";
+ private static final String LOGS_DIR = "jail://global:00/falcon/staging/feed/logs";
private static final String NOMINAL_TIME = "2014-01-01-01-00";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -97,6 +102,7 @@ public class MetadataMappingServiceTest {
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
"jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
+ public static final String COUNTERS = "TIMETAKEN:36956,COPY:30,BYTESCOPIED:1000";
public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
@@ -580,6 +586,26 @@ public class MetadataMappingServiceTest {
Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
}
+ @Test
+ public void testLineageForJobCounter() throws Exception {
+ setupForJobCounters();
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "IGNORE", "IGNORE", "IGNORE", "NONE"),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+ Graph graph = service.getGraph();
+
+ Vertex vertex = graph.getVertices("name", "sample-process/2014-01-01T01:00Z").iterator().next();
+ Assert.assertEquals(vertex.getProperty("TIMETAKEN"), 36956L);
+ Assert.assertEquals(vertex.getProperty("COPY"), 30L);
+ Assert.assertEquals(vertex.getProperty("BYTESCOPIED"), 1000L);
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 9);
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 14);
+ verifyLineageGraphForJobCounters(context);
+ }
+
private void verifyUpdatedEdges(Process newProcess) {
Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
@@ -946,6 +972,13 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName());
}
+ private void verifyLineageGraphForJobCounters(WorkflowExecutionContext context) throws Exception {
+ Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME,
+ RelationshipType.PROCESS_ENTITY);
+ Assert.assertEquals(processVertex.getProperty("name"), PROCESS_ENTITY_NAME);
+ Assert.assertTrue(context.getCounters().length()>0);
+ }
+
private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames,
String feedInstancePaths, String falconInputPaths,
String falconInputFeeds) {
@@ -995,6 +1028,36 @@ public class MetadataMappingServiceTest {
};
}
+ private void setupForJobCounters() throws Exception {
+ cleanUp();
+ service.init();
+ // Add cluster
+ clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+ "classification=production");
+ List<Feed> inFeeds = new ArrayList<>();
+ List<Feed> outFeeds = new ArrayList<>();
+
+ createJobCountersFileForTest();
+ // Add process
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION, inFeeds, outFeeds);
+ }
+
+ private void createJobCountersFileForTest() throws Exception {
+ Path counterFile = new Path(LOGS_DIR, "counter.txt");
+ OutputStream out = null;
+ try {
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+ new Path(LOGS_DIR).toUri());
+ out = fs.create(counterFile);
+ out.write(COUNTERS.getBytes());
+ out.flush();
+ } finally {
+ out.close();
+ }
+ }
+
private void setup() throws Exception {
cleanUp();
service.init();
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index a0358db..36d9b50 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -32,6 +32,33 @@
<name>Apache Falcon Metrics</name>
<packaging>jar</packaging>
+ <profiles>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java b/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java
new file mode 100644
index 0000000..9dc7259
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Obtain and store Filesystem Replication counters from FeedReplicator job.
+ */
+public class FSReplicationCounters extends JobCounters {
+ private static final Logger LOG = LoggerFactory.getLogger(FSReplicationCounters.class);
+
+ public FSReplicationCounters() {
+ super();
+ }
+
+
+ protected void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException {
+ if (isDistCp) {
+ populateReplicationCountersMap(jobCounters);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobCounters.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCounters.java b/metrics/src/main/java/org/apache/falcon/job/JobCounters.java
new file mode 100644
index 0000000..275fbd5
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/JobCounters.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.mapred.CopyMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Job Counters abstract class to be extended by supported job type.
+ */
+public abstract class JobCounters {
+ private static final Logger LOG = LoggerFactory.getLogger(JobCounters.class);
+ protected Map<String, Long> countersMap = null;
+
+ public JobCounters() {
+ countersMap = new HashMap<String, Long>();
+ }
+
+ public void obtainJobCounters(Configuration conf, Job job, boolean isDistCp) throws IOException {
+ try {
+ long timeTaken = job.getFinishTime() - job.getStartTime();
+ countersMap.put(ReplicationJobCountersList.TIMETAKEN.getName(), timeTaken);
+ Counters jobCounters = job.getCounters();
+ parseJob(job, jobCounters, isDistCp);
+ } catch (Exception e) {
+ LOG.info("Exception occurred while obtaining job counters: {}", e);
+ }
+ }
+
+ protected void populateReplicationCountersMap(Counters jobCounters) {
+ for(CopyMapper.Counter copyCounterVal : CopyMapper.Counter.values()) {
+ if (ReplicationJobCountersList.getCountersKey(copyCounterVal.name()) != null) {
+ Counter counter = jobCounters.findCounter(copyCounterVal);
+ if (counter != null) {
+ String counterName = counter.getName();
+ long counterValue = counter.getValue();
+ countersMap.put(counterName, counterValue);
+ }
+ }
+ }
+ }
+
+ public void storeJobCounters(Configuration conf, Path counterFile) throws IOException {
+ FileSystem sourceFs = FileSystem.get(conf);
+ OutputStream out = null;
+ try {
+ out = sourceFs.create(counterFile);
+ for (Map.Entry<String, Long> counter : countersMap.entrySet()) {
+ out.write((counter.getKey() + ":" + counter.getValue()).getBytes());
+ out.write("\n".getBytes());
+ }
+ out.flush();
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ public Map<String, Long> getCountersMap() {
+ return countersMap;
+ }
+
+ protected abstract void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
new file mode 100644
index 0000000..e8b68ff
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job counters handler to initialize the required concrete class for obtaining job counters.
+ */
+public final class JobCountersHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(JobCountersHandler.class);
+ private JobCountersHandler() {
+ }
+
+ public static JobCounters getCountersType(String jobType) {
+ if (jobType.equals(JobType.FSREPLICATION.name())) {
+ return new FSReplicationCounters();
+ }
+
+ LOG.error("JobType is not supported:" + jobType);
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobType.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobType.java b/metrics/src/main/java/org/apache/falcon/job/JobType.java
new file mode 100644
index 0000000..456e57f
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/JobType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+/**
+ * Types of the job for which counters need to obtain.
+ */
+public enum JobType {
+ FSREPLICATION
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java b/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java
new file mode 100644
index 0000000..d8c3377
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+/**
+ * List of counters for replication job.
+ */
+public enum ReplicationJobCountersList {
+ TIMETAKEN("TIMETAKEN", "time taken by the distcp job"),
+ BYTESCOPIED("BYTESCOPIED", "number of bytes copied"),
+ COPY("COPY", "number of files copied");
+
+ private final String name;
+ private final String description;
+
+ ReplicationJobCountersList(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public static ReplicationJobCountersList getCountersKey(String counterKey) {
+ if (counterKey != null) {
+ for (ReplicationJobCountersList value : ReplicationJobCountersList.values()) {
+ if (counterKey.equals(value.getName())) {
+ return value;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java
----------------------------------------------------------------------
diff --git a/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java b/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java
new file mode 100644
index 0000000..abe8379
--- /dev/null
+++ b/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for FS Replication Counters.
+ */
+public class FSReplicationCountersTest {
+ private List<String> countersList = new ArrayList<String>();
+ private final String[] countersArgs = new String[] { "TIMETAKEN:5000", "BYTESCOPIED:1000L", "COPY:1" };
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ for (String counters : countersArgs) {
+ String countersKey = counters.split(":")[0];
+ countersList.add(countersKey);
+ }
+ }
+
+ @Test
+ public void testObtainJobCounters() throws Exception {
+ for (String counters : countersArgs) {
+ String countersKey = counters.split(":")[0];
+ Assert.assertEquals(countersKey, ReplicationJobCountersList.getCountersKey(countersKey).getName());
+ }
+
+ Assert.assertEquals(countersArgs.length, ReplicationJobCountersList.values().length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index b82f4e0..0dc09ee 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -56,6 +56,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
addHDFSServersConfig(replication, src, target);
addAdditionalReplicationProperties(replication);
+ enableCounters(replication);
addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(replication);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index a7c19cd..5a62130 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
@@ -37,6 +38,7 @@ import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
+import java.util.List;
import java.util.Properties;
/**
@@ -47,11 +49,24 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
protected static final String REPLICATION_ACTION_NAME = "replication";
private static final String MR_MAX_MAPS = "maxMaps";
private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
+ private static final String REPLICATION_JOB_COUNTER = "job.counter";
public FeedReplicationWorkflowBuilder(Feed entity) {
super(entity, LifeCycle.REPLICATION);
}
+ public boolean isCounterEnabled() throws FalconException {
+ if (entity.getProperties() != null) {
+ List<Property> propertyList = entity.getProperties().getProperties();
+ for (Property prop : propertyList) {
+ if (prop.getName().equals(REPLICATION_JOB_COUNTER) && "true".equalsIgnoreCase(prop.getValue())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
@@ -99,6 +114,16 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
}
return action;
}
+
+ protected ACTION enableCounters(ACTION action) throws FalconException {
+ if (isCounterEnabled()) {
+ List<String> args = action.getJava().getArg();
+ args.add("-counterLogDir");
+ args.add("${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}");
+ }
+ return action;
+ }
+
protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index cfce1ae..5e93027 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -91,6 +91,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
private Feed fsReplFeed;
private Feed lifecycleRetentionFeed;
private Feed retentionFeed;
+ private Feed fsReplFeedCounter;
private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml";
private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml";
@@ -99,6 +100,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml";
private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml";
private static final String FS_RETENTION_ORIG_FEED = "/feed/fs-retention-feed.xml";
+ private static final String FS_REPLICATION_FEED_COUNTER = "/feed/fs-replication-feed-counters.xml";
@BeforeClass
public void setUpDFS() throws Exception {
@@ -129,6 +131,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
feed = (Feed) storeEntity(EntityType.FEED, FEED);
fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED);
+ fsReplFeedCounter = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED_COUNTER);
tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED);
lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED);
retentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_ORIG_FEED);
@@ -336,6 +339,18 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions);
}
+ @Test
+ public void testReplicationWithCounters() throws Exception {
+ OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeedCounter, Tag.REPLICATION);
+ List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon"));
+ final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
+ alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+ Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+ Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+ String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeedCounter);
+ assertReplCoord(alphaCoord, fsReplFeedCounter, alphaTrgCluster, pathsWithPartitions);
+ }
+
private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
Feed aFeed) throws FalconException {
String srcPart = FeedHelper.normalizePartitionExpression(
@@ -363,12 +378,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
- assertWorkflowDefinition(fsReplFeed, workflow, false);
+ assertWorkflowDefinition(aFeed, workflow, false);
ACTION replicationActionNode = getAction(workflow, "replication");
JAVA replication = replicationActionNode.getJava();
List<String> args = replication.getArg();
- Assert.assertEquals(args.size(), 15);
+ if (args.contains("-counterLogDir")) {
+ Assert.assertEquals(args.size(), 17);
+ } else {
+ Assert.assertEquals(args.size(), 15);
+ }
HashMap<String, String> props = getCoordProperties(coord);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
new file mode 100644
index 0000000..230e2b0
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
@@ -0,0 +1,59 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="billing RC File" name="replication-test-counter" xmlns="uri:falcon:feed:0.1">
+ <partitions>
+ <partition name="colo"/>
+ <partition name="eventTime"/>
+ <partition name="impressionHour"/>
+ <partition name="pricingModel"/>
+ </partitions>
+
+ <groups>online,bi</groups>
+
+ <frequency>minutes(5)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="minutes(1)"/>
+
+ <clusters>
+ <cluster partition="${cluster.colo}" type="source" name="corp1">
+ <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+ <retention action="delete" limit="days(10000)"/>
+ </cluster>
+ <cluster type="target" name="alpha">
+ <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/>
+ <retention action="delete" limit="days(10000)"/>
+ <locations>
+ <location path="/localDC/rc/billing/ua1/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+ </locations>
+ </cluster>
+ </clusters>
+
+ <locations>
+ <location path="/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+ <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+ <location path="/data/regression/fetlrc/billing/metadata" type="meta"/>
+ </locations>
+
+ <ACL permission="0x755" group="group" owner="fetl"/>
+ <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+ <properties>
+ <property name="maxMaps" value="33" />
+ <property name="mapBandwidth" value="2" />
+ <property name="job.counter" value="true" />
+ </properties>
+</feed>
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 3cc96fc..78c50f3 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -59,6 +59,10 @@
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-metrics</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index a226058..e97e84e 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -27,12 +27,17 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.job.JobCountersHandler;
+import org.apache.falcon.job.JobType;
+import org.apache.falcon.job.JobCounters;
import org.apache.falcon.util.ReplicationDistCpOption;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.util.Tool;
@@ -89,7 +94,18 @@ public class FeedReplicator extends Configured implements Tool {
? new CustomReplicator(conf, options)
: new DistCp(conf, options);
LOG.info("Started DistCp");
- distCp.execute();
+ Job job = distCp.execute();
+
+ if (cmd.hasOption("counterLogDir")
+ && job.getStatus().getState() == JobStatus.State.SUCCEEDED) {
+ LOG.info("Gathering counters for the the Feed Replication job");
+ Path counterFile = new Path(cmd.getOptionValue("counterLogDir"), "counter.txt");
+ JobCounters fsReplicationCounters = JobCountersHandler.getCountersType(JobType.FSREPLICATION.name());
+ if (fsReplicationCounters != null) {
+ fsReplicationCounters.obtainJobCounters(conf, job, true);
+ fsReplicationCounters.storeJobCounters(conf, counterFile);
+ }
+ }
if (includePathSet) {
executePostProcessing(conf, options); // this only applies for FileSystem Storage.
@@ -161,6 +177,10 @@ public class FeedReplicator extends Configured implements Tool {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("counterLogDir", true, "log directory to store job counter file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return new GnuParser().parse(options, args);
}