You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/05/29 19:55:00 UTC
[2/3] git commit: Pig: disable split combination,
add split size param Patch by Alex Liu,
reviewed by brandonwilliams for CASSANDRA-5544
Pig: disable split combination, add split size param
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5544
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d7404bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d7404bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d7404bc
Branch: refs/heads/trunk
Commit: 6d7404bc2eec2c34feb6e2b9db938f9e0e5ae208
Parents: 27e8f87
Author: Brandon Williams <br...@apache.org>
Authored: Wed May 29 12:53:52 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed May 29 12:53:52 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
examples/pig/README.txt | 4 +++
.../cassandra/hadoop/pig/CassandraStorage.java | 21 +++++++++++++++
3 files changed, 26 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34e5b52..61bd4b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
Merged from 1.1:
* Remove buggy thrift max message length option (CASSANDRA-5529)
* Fix NPE in Pig's widerow mode (CASSANDRA-5488)
+ * Add split size parameter to Pig and disable split combination (CASSANDRA-5544)
1.2.5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index 3ef7858..e3d9af6 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -88,3 +88,7 @@ PIG_USE_SECONDARY: this allows easy use of secondary indexes within your
can also be set in the LOAD url by adding the
'use_secondary=true' parameter.
+PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling
+ the amount of mapper tasks created. This can also be set in the LOAD url by
+ adding the 'split_size=X' parameter, where X is an integer amount for the size.
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d7404bc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0854758..6490d05 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -77,6 +77,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
+ public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
@@ -105,6 +106,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private int limit;
private boolean widerows = false;
private boolean usePartitionFilter = false;
+ private int splitSize = 64 * 1024;
// wide row hacks
private ByteBuffer lastKey;
private Map<ByteBuffer,IColumn> lastRow;
@@ -516,6 +518,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
if (urlQuery.containsKey("use_secondary"))
usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
+ if (urlQuery.containsKey("split_size"))
+ splitSize = Integer.parseInt(urlQuery.get("split_size"));
}
String[] parts = urlParts[0].split("/+");
String[] credentialsAndKeyspace = parts[1].split("@");
@@ -591,6 +595,9 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
public void setLocation(String location, Job job) throws IOException
{
conf = job.getConfiguration();
+
+ // don't combine mappers to a single mapper per node
+ conf.setBoolean("pig.noSplitCombination", true);
setLocationFromUri(location);
if (ConfigHelper.getInputSlicePredicate(conf) == null)
@@ -603,12 +610,26 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
widerows = Boolean.valueOf(System.getenv(PIG_WIDEROW_INPUT));
if (System.getenv(PIG_USE_SECONDARY) != null)
usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
+ if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ {
+ try
+ {
+ ConfigHelper.setInputSplitSize(conf, Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a number", e);
+ }
+ }
if (usePartitionFilter && getIndexExpressions() != null)
ConfigHelper.setInputRange(conf, getIndexExpressions());
if (username != null && password != null)
ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
setConnectionInformation();