You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/12/10 18:43:20 UTC
[05/10] cassandra git commit: Fix regression in split size on
CqlInputFormat
Fix regression in split size on CqlInputFormat
Patch by aaliev; reviewed by jmckenzie for CASSANDRA-10835
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f3e47bf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f3e47bf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f3e47bf
Branch: refs/heads/trunk
Commit: 4f3e47bf9edbfefec2f85ecd915ac8fbbc81de8b
Parents: c8493c4
Author: Artem Aliev <ar...@gmail.com>
Authored: Thu Dec 10 12:33:58 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 10 12:33:58 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractColumnFamilyInputFormat.java | 36 ++++++++++++++------
.../apache/cassandra/hadoop/ConfigHelper.java | 26 +++++++++++++-
.../cassandra/hadoop/cql3/CqlInputFormat.java | 8 +++--
4 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd6b92e..30a76a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.5
+ * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
* Better handling of SSL connection errors inter-node (CASSANDRA-10816)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 3c088c2..d55f205 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -18,8 +18,19 @@
package org.apache.cassandra.hadoop;
import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,18 +42,19 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TokenRange;
-
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.hadoop.cql3.*;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
{
@@ -230,9 +242,10 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException
{
int splitSize = ConfigHelper.getInputSplitSize(conf);
+ int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
try
{
- return describeSplits(keyspace, cfName, range, splitSize, session);
+ return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session);
}
catch (Exception e)
{
@@ -252,7 +265,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, Session session)
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session)
{
String query = String.format("SELECT mean_partition_size, partitions_count " +
"FROM %s.%s " +
@@ -275,7 +288,10 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
long meanPartitionSize = row.getLong("mean_partition_size");
long partitionCount = row.getLong("partitions_count");
- int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+ int splitCount = splitSizeMb > 0
+ ? (int)(meanPartitionSize * partitionCount / splitSizeMb / 1024 / 1024)
+ : (int)(partitionCount / splitSize);
+
if (splitCount <= 0) splitCount = 1;
List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
Map<TokenRange, Long> rangesWithLength = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index e81860d..376c250 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -54,6 +54,7 @@ public class ConfigHelper
private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
+ private static final String INPUT_SPLIT_SIZE_IN_MB_CONFIG = "cassandra.input.split.size_mb";
private static final String INPUT_WIDEROWS_CONFIG = "cassandra.input.widerows";
private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
@@ -176,7 +177,7 @@ public class ConfigHelper
* the overhead of each map will take up the bulk of the job time.
*
* @param conf Job configuration you are about to run
- * @param splitsize Size of the input split
+ * @param splitsize Number of partitions in the input split
*/
public static void setInputSplitSize(Configuration conf, int splitsize)
{
@@ -189,6 +190,29 @@ public class ConfigHelper
}
/**
+ * Set the size of the input split. getInputSplitSize value is used if this is not set.
+ * This affects the number of maps created, if the number is too small
+ * the overhead of each map will take up the bulk of the job time.
+ *
+ * @param conf Job configuration you are about to run
+ * @param splitSizeMb Input split size in MB
+ */
+ public static void setInputSplitSizeInMb(Configuration conf, int splitSizeMb)
+ {
+ conf.setInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, splitSizeMb);
+ }
+
+ /**
+ * cassandra.input.split.size will be used if the value is undefined or negative.
+ * @param conf Job configuration you are about to run
+ * @return split size in MB or -1 if it is undefined.
+ */
+ public static int getInputSplitSizeInMb(Configuration conf)
+ {
+ return conf.getInt(INPUT_SPLIT_SIZE_IN_MB_CONFIG, -1);
+ }
+
+ /**
* Set the predicate that determines what columns will be selected from each row.
*
* @param conf Job configuration you are about to run
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f3e47bf/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 36da92d..c46ceb8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -40,10 +40,12 @@ import com.datastax.driver.core.Row;
* ConfigHelper.setInputColumnFamily
*
* You can also configure the number of rows per InputSplit with
- * ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ * 1: ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ * or
+ * 2: ConfigHelper.setInputSplitSizeInMb. InputSplit size in MB with new, more precise method
+ * If no value is provided for InputSplitSizeInMb, InputSplitSize will be used.
*
- * the number of CQL rows per page
- * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
+ * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
* should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
* query, so you need set it big enough to minimize the network overhead, and also
* not too big to avoid out of memory issue.