You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by so...@apache.org on 2015/10/13 01:31:14 UTC

falcon git commit: FALCON-1102 Gather data transfer details of filesystem replication. Contributed by Peeyush Bishnoi.

Repository: falcon
Updated Branches:
  refs/heads/master 08fbf4f38 -> d81820082


FALCON-1102 Gather data transfer details of filesystem replication. Contributed by Peeyush Bishnoi.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d8182008
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d8182008
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d8182008

Branch: refs/heads/master
Commit: d8182008269050e926b99c7111817d1e327c0dfd
Parents: 08fbf4f
Author: Sowmya Ramesh <sr...@hortonworks.com>
Authored: Mon Oct 12 16:31:04 2015 -0700
Committer: Sowmya Ramesh <sr...@hortonworks.com>
Committed: Mon Oct 12 16:31:04 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../resources/hdfs-replication-workflow.xml     |  2 +
 .../InstanceRelationshipGraphBuilder.java       | 30 +++++++
 .../falcon/workflow/WorkflowExecutionArgs.java  |  3 +-
 .../workflow/WorkflowExecutionContext.java      | 64 ++++++++++++++
 .../metadata/MetadataMappingServiceTest.java    | 67 +++++++++++++-
 metrics/pom.xml                                 | 27 ++++++
 .../falcon/job/FSReplicationCounters.java       | 44 ++++++++++
 .../java/org/apache/falcon/job/JobCounters.java | 92 ++++++++++++++++++++
 .../apache/falcon/job/JobCountersHandler.java   | 41 +++++++++
 .../java/org/apache/falcon/job/JobType.java     | 26 ++++++
 .../falcon/job/ReplicationJobCountersList.java  | 61 +++++++++++++
 .../falcon/job/FSReplicationCountersTest.java   | 52 +++++++++++
 .../feed/FSReplicationWorkflowBuilder.java      |  1 +
 .../feed/FeedReplicationWorkflowBuilder.java    | 25 ++++++
 .../feed/OozieFeedWorkflowBuilderTest.java      | 23 ++++-
 .../feed/fs-replication-feed-counters.xml       | 59 +++++++++++++
 replication/pom.xml                             |  4 +
 .../falcon/replication/FeedReplicator.java      | 22 ++++-
 19 files changed, 639 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff8c40e..e6c8f28 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
     FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) 
 
   NEW FEATURES
+    FALCON-1102 Gather data transfer details of filesystem replication(Peeyush Bishnoi via Sowmya Ramesh)
+
     FALCON-1316 Add supporting REST API calls for new UI(Balu Vellanki via Sowmya Ramesh)
 
     FALCON-1473 Feed SLA Miss Alerts through REST API(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
index 942421f..c1966be 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
@@ -67,6 +67,8 @@
             <arg>FILESYSTEM</arg>
             <arg>-availabilityFlag</arg>
             <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
+            <arg>-counterLogDir</arg>
+            <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg>
         </java>
         <ok to="end"/>
         <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 016c622..f485764 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -91,9 +91,25 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             addPipelines(process.getPipelines(), processInstance);
         }
 
+        addCounters(processInstance, context);
+
         return processInstance;
     }
 
+    private void addCounters(Vertex processInstance, WorkflowExecutionContext context) throws FalconException {
+        String counterString = getCounterString(context);
+        if (!StringUtils.isBlank(counterString)) {
+            addCountersToInstance(counterString, processInstance);
+        }
+    }
+
+    private String getCounterString(WorkflowExecutionContext context) {
+        if (!StringUtils.isBlank(context.getCounters())) {
+            return context.getCounters();
+        }
+        return null;
+    }
+
     public String getProcessInstanceName(WorkflowExecutionContext context) {
         return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
     }
@@ -118,6 +134,18 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         vertex.setProperty(optionName.getName(), value);
     }
 
+    private void addCountersToInstance(String counterString, Vertex vertex) throws FalconException {
+        String[] counterKeyValues = counterString.split(",");
+        try {
+            for (String counter : counterKeyValues) {
+                String[] keyVals = counter.split(":", 2);
+                vertex.setProperty(keyVals[0], Long.parseLong(keyVals[1]));
+            }
+        } catch (NumberFormatException e) {
+            throw new FalconException("Invalid values for counter:" + e);
+        }
+    }
+
     public void addInstanceToEntity(Vertex instanceVertex, String entityName,
                                     RelationshipType entityType, RelationshipLabel edgeLabel) {
         addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null);
@@ -200,6 +228,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
         addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
+
+        addCounters(feedInstanceVertex, context);
     }
 
     public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index d2430a2..ac7140c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -84,7 +84,8 @@ public enum WorkflowExecutionArgs {
     LOG_DIR("logDir", "log dir where lineage can be recorded"),
 
     CONTEXT_FILE("contextFile", "wf execution context file path where wf properties are recorded", false),
-    CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false);
+    CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false),
+    COUNTERS("counters", "store job counters", false);
 
 
     private final String name;

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 45b6d23..b870e3a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.EntityType;
@@ -314,6 +315,10 @@ public class WorkflowExecutionContext {
         return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE));
     }
 
+    public String getCounters() {
+        return getValue(WorkflowExecutionArgs.COUNTERS);
+    }
+
     /**
      * this method is invoked from with in the workflow.
      *
@@ -383,6 +388,33 @@ public class WorkflowExecutionContext {
     }
 
 
+    public static Path getCounterFile(String logDir) {
+        return new Path(logDir, "counter.txt");
+    }
+
+    public static String readCounters(FileSystem fs, Path counterFile) throws IOException{
+        StringBuilder counterBuffer = new StringBuilder();
+        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(counterFile)));
+        try {
+            String line;
+            while ((line = in.readLine()) != null) {
+                counterBuffer.append(line);
+                counterBuffer.append(",");
+            }
+        } catch (IOException e) {
+            throw e;
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+
+        String counterString = counterBuffer.toString();
+        if (StringUtils.isNotBlank(counterString) && counterString.length() > 0) {
+            return counterString.substring(0, counterString.length() - 1);
+        } else {
+            return null;
+        }
+    }
+
     public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
         return create(args, type, null);
     }
@@ -408,10 +440,42 @@ public class WorkflowExecutionContext {
         executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
                 getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),
                         executionContext.getEntityType(), executionContext.getOperation()));
+        addCounterToWF(executionContext);
 
         return executionContext;
     }
 
+    private static void addCounterToWF(WorkflowExecutionContext executionContext) throws FalconException {
+        if (executionContext.hasWorkflowFailed()) {
+            LOG.info("Workflow Instance failed, counter will not be added: {}",
+                    executionContext.getWorkflowRunIdString());
+            return;
+        }
+
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                new Path(executionContext.getLogDir()).toUri());
+        Path counterFile = getCounterFile(executionContext.getLogDir());
+        try {
+            if (fs.exists(counterFile)) {
+                String counters = readCounters(fs, counterFile);
+                if (StringUtils.isNotBlank(counters)) {
+                    executionContext.context.put(WorkflowExecutionArgs.COUNTERS, counters);
+                }
+            }
+        } catch (IOException e) {
+            LOG.error("Error in accessing counter file :" + e);
+        } finally {
+            try {
+                if (fs.exists(counterFile)) {
+                    fs.delete(counterFile, false);
+                }
+                fs.close();
+            } catch (IOException e) {
+                LOG.error("Unable to delete counter file: {}", e);
+            }
+        }
+    }
+
     private static CommandLine getCommand(String[] arguments) throws ParseException {
         Options options = new Options();
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 89e8178..29f933d 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -41,20 +41,23 @@ import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
-import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -62,13 +65,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
+
 /**
  * Test for Metadata relationship mapping service.
  */
 public class MetadataMappingServiceTest {
 
     public static final String FALCON_USER = "falcon-user";
-    private static final String LOGS_DIR = "/falcon/staging/feed/logs";
+    private static final String LOGS_DIR = "jail://global:00/falcon/staging/feed/logs";
     private static final String NOMINAL_TIME = "2014-01-01-01-00";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -97,6 +102,7 @@ public class MetadataMappingServiceTest {
             "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
     public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
             "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
+    public static final String COUNTERS = "TIMETAKEN:36956,COPY:30,BYTESCOPIED:1000";
 
     public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
 
@@ -580,6 +586,26 @@ public class MetadataMappingServiceTest {
         Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
     }
 
+    @Test
+    public void testLineageForJobCounter() throws Exception {
+        setupForJobCounters();
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "IGNORE", "IGNORE", "IGNORE", "NONE"),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        Graph graph = service.getGraph();
+
+        Vertex vertex = graph.getVertices("name", "sample-process/2014-01-01T01:00Z").iterator().next();
+        Assert.assertEquals(vertex.getProperty("TIMETAKEN"), 36956L);
+        Assert.assertEquals(vertex.getProperty("COPY"), 30L);
+        Assert.assertEquals(vertex.getProperty("BYTESCOPIED"), 1000L);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 9);
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 14);
+        verifyLineageGraphForJobCounters(context);
+    }
+
     private void verifyUpdatedEdges(Process newProcess) {
         Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
 
@@ -946,6 +972,13 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName());
     }
 
+    private void verifyLineageGraphForJobCounters(WorkflowExecutionContext context) throws Exception {
+        Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME,
+                RelationshipType.PROCESS_ENTITY);
+        Assert.assertEquals(processVertex.getProperty("name"), PROCESS_ENTITY_NAME);
+        Assert.assertTrue(context.getCounters().length()>0);
+    }
+
     private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames,
                                                String feedInstancePaths, String falconInputPaths,
                                                String falconInputFeeds) {
@@ -995,6 +1028,36 @@ public class MetadataMappingServiceTest {
         };
     }
 
+    private void setupForJobCounters() throws Exception {
+        cleanUp();
+        service.init();
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        List<Feed> inFeeds = new ArrayList<>();
+        List<Feed> outFeeds = new ArrayList<>();
+
+        createJobCountersFileForTest();
+        // Add process
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION, inFeeds, outFeeds);
+    }
+
+    private void createJobCountersFileForTest() throws Exception {
+        Path counterFile = new Path(LOGS_DIR, "counter.txt");
+        OutputStream out = null;
+        try {
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    new Path(LOGS_DIR).toUri());
+            out = fs.create(counterFile);
+            out.write(COUNTERS.getBytes());
+            out.flush();
+        }  finally {
+            out.close();
+        }
+    }
+
     private void setup() throws Exception {
         cleanUp();
         service.init();

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index a0358db..36d9b50 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -32,6 +32,33 @@
     <name>Apache Falcon Metrics</name>
     <packaging>jar</packaging>
 
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
     <dependencies>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java b/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java
new file mode 100644
index 0000000..9dc7259
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/FSReplicationCounters.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Obtain and store Filesystem Replication counters from FeedReplicator job.
+ */
+public class FSReplicationCounters extends JobCounters {
+    private static final Logger LOG = LoggerFactory.getLogger(FSReplicationCounters.class);
+
+    public FSReplicationCounters() {
+        super();
+    }
+
+
+    protected void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException {
+        if (isDistCp) {
+            populateReplicationCountersMap(jobCounters);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobCounters.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCounters.java b/metrics/src/main/java/org/apache/falcon/job/JobCounters.java
new file mode 100644
index 0000000..275fbd5
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/JobCounters.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.mapred.CopyMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Job Counters abstract class to be extended by supported job type.
+ */
+public abstract class JobCounters {
+    private static final Logger LOG = LoggerFactory.getLogger(JobCounters.class);
+    protected Map<String, Long> countersMap = null;
+
+    public JobCounters() {
+        countersMap = new HashMap<String, Long>();
+    }
+
+    public void obtainJobCounters(Configuration conf, Job job, boolean isDistCp) throws IOException {
+        try {
+            long timeTaken = job.getFinishTime() - job.getStartTime();
+            countersMap.put(ReplicationJobCountersList.TIMETAKEN.getName(), timeTaken);
+            Counters jobCounters = job.getCounters();
+            parseJob(job, jobCounters, isDistCp);
+        } catch (Exception e) {
+            LOG.info("Exception occurred while obtaining job counters: {}", e);
+        }
+    }
+
+    protected void populateReplicationCountersMap(Counters jobCounters) {
+        for(CopyMapper.Counter copyCounterVal : CopyMapper.Counter.values()) {
+            if (ReplicationJobCountersList.getCountersKey(copyCounterVal.name()) != null) {
+                Counter counter = jobCounters.findCounter(copyCounterVal);
+                if (counter != null) {
+                    String counterName = counter.getName();
+                    long counterValue = counter.getValue();
+                    countersMap.put(counterName, counterValue);
+                }
+            }
+        }
+    }
+
+    public void storeJobCounters(Configuration conf, Path counterFile) throws IOException {
+        FileSystem sourceFs = FileSystem.get(conf);
+        OutputStream out = null;
+        try {
+            out = sourceFs.create(counterFile);
+            for (Map.Entry<String, Long> counter : countersMap.entrySet()) {
+                out.write((counter.getKey() + ":" + counter.getValue()).getBytes());
+                out.write("\n".getBytes());
+            }
+            out.flush();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    public Map<String, Long> getCountersMap() {
+        return countersMap;
+    }
+
+    protected abstract void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
new file mode 100644
index 0000000..e8b68ff
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job counters handler to initialize the required concrete class for obtaining job counters.
+ */
+public final class JobCountersHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(JobCountersHandler.class);
+    private JobCountersHandler() {
+    }
+
+    public static JobCounters getCountersType(String jobType) {
+        if (jobType.equals(JobType.FSREPLICATION.name())) {
+            return new FSReplicationCounters();
+        }
+
+        LOG.error("JobType is not supported:" + jobType);
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/JobType.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobType.java b/metrics/src/main/java/org/apache/falcon/job/JobType.java
new file mode 100644
index 0000000..456e57f
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/JobType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+/**
+ * Types of the job for which counters need to obtain.
+ */
+public enum JobType {
+    FSREPLICATION
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java b/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java
new file mode 100644
index 0000000..d8c3377
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/ReplicationJobCountersList.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+/**
+ * List of counters for replication job.
+ */
+public enum ReplicationJobCountersList {
+    TIMETAKEN("TIMETAKEN", "time taken by the distcp job"),
+    BYTESCOPIED("BYTESCOPIED", "number of bytes copied"),
+    COPY("COPY", "number of files copied");
+
+    private final String name;
+    private final String description;
+
+    ReplicationJobCountersList(String name, String description) {
+        this.name = name;
+        this.description = description;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public static ReplicationJobCountersList getCountersKey(String counterKey) {
+        if (counterKey != null) {
+            for (ReplicationJobCountersList value : ReplicationJobCountersList.values()) {
+                if (counterKey.equals(value.getName())) {
+                    return value;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java
----------------------------------------------------------------------
diff --git a/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java b/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java
new file mode 100644
index 0000000..abe8379
--- /dev/null
+++ b/metrics/src/test/java/org/apache/falcon/job/FSReplicationCountersTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for FS Replication Counters.
+ */
+public class FSReplicationCountersTest {
+    private List<String> countersList = new ArrayList<String>();
+    private final String[] countersArgs = new String[] { "TIMETAKEN:5000", "BYTESCOPIED:1000L", "COPY:1" };
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        for (String counters : countersArgs) {
+            String countersKey = counters.split(":")[0];
+            countersList.add(countersKey);
+        }
+    }
+
+    @Test
+    public void testObtainJobCounters() throws Exception {
+        for (String counters : countersArgs) {
+            String countersKey = counters.split(":")[0];
+            Assert.assertEquals(countersKey, ReplicationJobCountersList.getCountersKey(countersKey).getName());
+        }
+
+        Assert.assertEquals(countersArgs.length, ReplicationJobCountersList.values().length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index b82f4e0..0dc09ee 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -56,6 +56,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
         addHDFSServersConfig(replication, src, target);
         addAdditionalReplicationProperties(replication);
+        enableCounters(replication);
         addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index a7c19cd..5a62130 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
@@ -37,6 +38,7 @@ import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -47,11 +49,24 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
     protected static final String REPLICATION_ACTION_NAME = "replication";
     private static final String MR_MAX_MAPS = "maxMaps";
     private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
+    private static final String REPLICATION_JOB_COUNTER = "job.counter";
 
     public FeedReplicationWorkflowBuilder(Feed entity) {
         super(entity, LifeCycle.REPLICATION);
     }
 
+    public boolean isCounterEnabled() throws FalconException {
+        if (entity.getProperties() != null) {
+            List<Property> propertyList = entity.getProperties().getProperties();
+            for (Property prop : propertyList) {
+                if (prop.getName().equals(REPLICATION_JOB_COUNTER) && "true".equalsIgnoreCase(prop.getValue())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
         Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
 
@@ -99,6 +114,16 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
         }
         return action;
     }
+
+    protected ACTION enableCounters(ACTION action) throws FalconException {
+        if (isCounterEnabled()) {
+            List<String> args = action.getJava().getArg();
+            args.add("-counterLogDir");
+            args.add("${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}");
+        }
+        return action;
+    }
+
     protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index cfce1ae..5e93027 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -91,6 +91,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     private Feed fsReplFeed;
     private Feed lifecycleRetentionFeed;
     private Feed retentionFeed;
+    private Feed fsReplFeedCounter;
 
     private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml";
     private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml";
@@ -99,6 +100,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml";
     private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml";
     private static final String FS_RETENTION_ORIG_FEED = "/feed/fs-retention-feed.xml";
+    private static final String FS_REPLICATION_FEED_COUNTER = "/feed/fs-replication-feed-counters.xml";
 
     @BeforeClass
     public void setUpDFS() throws Exception {
@@ -129,6 +131,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         feed = (Feed) storeEntity(EntityType.FEED, FEED);
         fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED);
+        fsReplFeedCounter = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED_COUNTER);
         tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED);
         lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED);
         retentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_ORIG_FEED);
@@ -336,6 +339,18 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions);
     }
 
+    @Test
+    public void testReplicationWithCounters() throws Exception {
+        OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeedCounter, Tag.REPLICATION);
+        List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon"));
+        final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
+                alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
+        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeedCounter);
+        assertReplCoord(alphaCoord, fsReplFeedCounter, alphaTrgCluster, pathsWithPartitions);
+    }
+
     private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
                                           Feed aFeed) throws FalconException {
         String srcPart = FeedHelper.normalizePartitionExpression(
@@ -363,12 +378,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
 
         WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
-        assertWorkflowDefinition(fsReplFeed, workflow, false);
+        assertWorkflowDefinition(aFeed, workflow, false);
 
         ACTION replicationActionNode = getAction(workflow, "replication");
         JAVA replication = replicationActionNode.getJava();
         List<String> args = replication.getArg();
-        Assert.assertEquals(args.size(), 15);
+        if (args.contains("-counterLogDir")) {
+            Assert.assertEquals(args.size(), 17);
+        } else {
+            Assert.assertEquals(args.size(), 15);
+        }
 
         HashMap<String, String> props = getCoordProperties(coord);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
new file mode 100644
index 0000000..230e2b0
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
@@ -0,0 +1,59 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="billing RC File" name="replication-test-counter" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp1">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+        <cluster type="target" name="alpha">
+            <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/>
+            <retention action="delete" limit="days(10000)"/>
+            <locations>
+                <location path="/localDC/rc/billing/ua1/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location path="/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/" type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata" type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+        <property name="mapBandwidth" value="2" />
+        <property name="job.counter" value="true" />
+    </properties>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 3cc96fc..78c50f3 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -59,6 +59,10 @@
             <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-common</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-metrics</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8182008/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index a226058..e97e84e 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -27,12 +27,17 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.job.JobCountersHandler;
+import org.apache.falcon.job.JobType;
+import org.apache.falcon.job.JobCounters;
 import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.util.Tool;
@@ -89,7 +94,18 @@ public class FeedReplicator extends Configured implements Tool {
                 ? new CustomReplicator(conf, options)
                 : new DistCp(conf, options);
         LOG.info("Started DistCp");
-        distCp.execute();
+        Job job = distCp.execute();
+
+        if (cmd.hasOption("counterLogDir")
+                && job.getStatus().getState() == JobStatus.State.SUCCEEDED) {
+            LOG.info("Gathering counters for the the Feed Replication job");
+            Path counterFile = new Path(cmd.getOptionValue("counterLogDir"), "counter.txt");
+            JobCounters fsReplicationCounters = JobCountersHandler.getCountersType(JobType.FSREPLICATION.name());
+            if (fsReplicationCounters != null) {
+                fsReplicationCounters.obtainJobCounters(conf, job, true);
+                fsReplicationCounters.storeJobCounters(conf, counterFile);
+            }
+        }
 
         if (includePathSet) {
             executePostProcessing(conf, options);  // this only applies for FileSystem Storage.
@@ -161,6 +177,10 @@ public class FeedReplicator extends Configured implements Tool {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("counterLogDir", true, "log directory to store job counter file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }