You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/12/10 18:43:19 UTC

[04/10] cassandra git commit: Fix regression in split size on CqlInputFormat

Fix regression in split size on CqlInputFormat

Patch by aaliev; reviewed by jmckenzie for CASSANDRA-10835


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f3e47bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f3e47bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f3e47bf

Branch: refs/heads/cassandra-3.0
Commit: 4f3e47bf9edbfefec2f85ecd915ac8fbbc81de8b
Parents: c8493c4
Author: Artem Aliev <ar...@gmail.com>
Authored: Thu Dec 10 12:33:58 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 10 12:33:58 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/AbstractColumnFamilyInputFormat.java | 36 ++++++++++++++------
 .../apache/cassandra/hadoop/ConfigHelper.java   | 26 +++++++++++++-
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |  8 +++--
 4 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6b92e..30a76a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
  * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 3c088c2..d55f205 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -18,8 +18,19 @@
 package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,18 +42,19 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TokenRange;
-
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.hadoop.cql3.*;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.thrift.KeyRange;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
 {
@@ -230,9 +242,10 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException
     {
         int splitSize = ConfigHelper.getInputSplitSize(conf);
+        int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
         try
         {
-            return describeSplits(keyspace, cfName, range, splitSize, session);
+            return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session);
         }
         catch (Exception e)
         {
@@ -252,7 +265,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         }
     }
 
-    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, Session session)
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session)
     {
         String query = String.format("SELECT mean_partition_size, partitions_count " +
                                      "FROM %s.%s " +
@@ -275,7 +288,10 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         long meanPartitionSize = row.getLong("mean_partition_size");
         long partitionCount = row.getLong("partitions_count");
 
-        int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+        int splitCount = splitSizeMb > 0
+            ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024)
+            : (int)(partitionCount / splitSize);
+
         if (splitCount <= 0) splitCount = 1;
         List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
         Map<TokenRange, Long> rangesWithLength = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index e81860d..376c250 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -54,6 +54,7 @@ public class ConfigHelper
     private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
     private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
     private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
+    private static final String INPUT_SPLIT_SIZE_IN_MB_CONFIG = "cassandra.input.split.size_mb";
     private static final String INPUT_WIDEROWS_CONFIG = "cassandra.input.widerows";
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
     private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
@@ -176,7 +177,7 @@ public class ConfigHelper
      * the overhead of each map will take up the bulk of the job time.
      *
      * @param conf      Job configuration you are about to run
-     * @param splitsize Size of the input split
+     * @param splitsize Number of partitions in the input split
      */
     public static void setInputSplitSize(Configuration conf, int splitsize)
     {
@@ -189,6 +190,29 @@ public class ConfigHelper
     }
 
     /**
+     * Set the size of the input split. getInputSplitSize value is used if this is not set.
+     * This affects the number of maps created, if the number is too small
+     * the overhead of each map will take up the bulk of the job time.
+     *
+     * @param conf        Job configuration you are about to run
+     * @param splitSizeMb Input split size in MB
+     */
+    public static void setInputSplitSizeInMb(Configuration conf, int splitSizeMb)
+    {
+        conf.setInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, splitSizeMb);
+    }
+
+    /**
+     * cassandra.input.split.size will be used if the value is undefined or negative.
+     * @param conf  Job configuration you are about to run
+     * @return      split size in MB or -1 if it is undefined.
+     */
+    public static int getInputSplitSizeInMb(Configuration conf)
+    {
+        return conf.getInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, -1);
+    }
+
+    /**
      * Set the predicate that determines what columns will be selected from each row.
      *
      * @param conf      Job configuration you are about to run

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 36da92d..c46ceb8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -40,10 +40,12 @@ import com.datastax.driver.core.Row;
  *   ConfigHelper.setInputColumnFamily
  *
  * You can also configure the number of rows per InputSplit with
- *   ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ *   1: ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ *   or
+ *   2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, more precise method
+ *   If no value is provided for InputSplitSizeInMb, InputSplitSize will be used.
  *
- *   the number of CQL rows per page
- *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You 
+ *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
  *   should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL 
  *   query, so you need set it big enough to minimize the network overhead, and also
  *   not too big to avoid out of memory issue.