You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/11/14 19:57:54 UTC

git commit: SQOOP-671: Mapreduce counters are not used in generated mapreduce jobs

Updated Branches:
  refs/heads/sqoop2 e8869ab34 -> 2c9a4eb46


SQOOP-671: Mapreduce counters are not used in generated mapreduce jobs

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2c9a4eb4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2c9a4eb4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2c9a4eb4

Branch: refs/heads/sqoop2
Commit: 2c9a4eb46c8e51834be946439b40e3116203581a
Parents: e8869ab
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Nov 14 10:56:53 2012 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Nov 14 10:56:53 2012 -0800

----------------------------------------------------------------------
 .../sqoop/submission/counter/SqoopCounters.java    |   25 +++++++++++++++
 .../connector/jdbc/GenericJdbcImportExtractor.java |    9 +++++-
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |    4 ++-
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |    5 +++
 .../java/org/apache/sqoop/job/TestMapReduce.java   |    5 +++
 .../java/org/apache/sqoop/job/etl/Extractor.java   |   13 ++++++++
 6 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
new file mode 100644
index 0000000..75f3980
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.counter;
+
+/**
+ *
+ */
+public enum SqoopCounters {
+  ROWS_READ;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
index 1b3fcff..b856ce6 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
@@ -32,6 +32,7 @@ public class GenericJdbcImportExtractor extends Extractor {
 
  public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class);
 
+ private long rowsRead = 0;
   @Override
   public void run(ImmutableContext context, Object connectionC, Object jobC, Partition partition, DataWriter writer) {
     String driver = context.getString(
@@ -52,6 +53,7 @@ public class GenericJdbcImportExtractor extends Extractor {
     query = query.replace(
         GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
     LOG.debug("Using query: " + query);
+    rowsRead = 0;
     ResultSet resultSet = executor.executeQuery(query);
 
     try {
@@ -63,8 +65,8 @@ public class GenericJdbcImportExtractor extends Extractor {
           array[i] = resultSet.getObject(i+1);
         }
         writer.writeArrayRecord(array);
+        rowsRead++;
       }
-
     } catch (SQLException e) {
       throw new SqoopException(
           GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
@@ -74,4 +76,9 @@ public class GenericJdbcImportExtractor extends Extractor {
     }
   }
 
+  @Override
+  public long getRowsRead() {
+    return rowsRead;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 1c0f3aa..fcedf52 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -31,6 +31,7 @@ import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
@@ -75,7 +76,8 @@ public class SqoopMapper
     try {
       extractor.run(subContext, configConnection, configJob, split.getPartition(),
         new MapDataWriter(context));
-
+      context.getCounter(SqoopCounters.ROWS_READ)
+              .increment(extractor.getRowsRead());
     } catch (Exception e) {
       throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index 875a123..2287b06 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -230,5 +230,10 @@ public class TestHdfsLoad extends TestCase {
         writer.writeArrayRecord(array);
       }
     }
+
+    @Override
+    public long getRowsRead() {
+      return NUMBER_OF_ROWS_PER_ID;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 6e49cc2..6dcf784 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -142,6 +142,11 @@ public class TestMapReduce extends TestCase {
             String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
       }
     }
+
+    @Override
+    public long getRowsRead() {
+      return NUMBER_OF_ROWS_PER_PARTITION;
+    }
   }
 
   public static class DummyOutputFormat

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2c9a4eb4/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index ba04be9..e824b98 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -32,4 +32,17 @@ public abstract class Extractor {
                            Partition partition,
                            DataWriter writer);
 
+  /**
+   * Return the number of rows read by the last call to
+   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
+   * method. This method returns only the number of rows read in the last call,
+   * and not a cumulative total of the number of rows read by this Extractor
+   * since its creation. If no calls were made to the run method, this method's
+   * behavior is undefined.
+   *
+   * @return the number of rows read by the last call to
+   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) }
+   */
+  public abstract long getRowsRead();
+
 }