You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/02/11 22:56:32 UTC
git commit: Use EB HadoopCompat for compat with Hadoop 0.2.x Patch by
Ben Coversaton, reviewed by brandonwilliams for CASSANDRA-5201
Updated Branches:
refs/heads/trunk 6de2fd9bf -> f007a3535
Use EB HadoopCompat for compat with Hadoop 0.2.x
Patch by Ben Coversaton, reviewed by brandonwilliams for CASSANDRA-5201
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f007a353
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f007a353
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f007a353
Branch: refs/heads/trunk
Commit: f007a35357da582e928dc1ac872e4ebb4c09b708
Parents: 6de2fd9
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 11 15:52:07 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Feb 11 15:53:43 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 3 +++
.../hadoop/AbstractColumnFamilyInputFormat.java | 11 ++++++-----
.../hadoop/AbstractColumnFamilyOutputFormat.java | 3 ++-
.../apache/cassandra/hadoop/BulkOutputFormat.java | 3 ++-
.../apache/cassandra/hadoop/BulkRecordWriter.java | 3 ++-
.../cassandra/hadoop/ColumnFamilyInputFormat.java | 17 +++++++++--------
.../hadoop/ColumnFamilyRecordReader.java | 3 ++-
.../hadoop/ColumnFamilyRecordWriter.java | 3 ++-
.../hadoop/cql3/CqlPagingInputFormat.java | 18 ++++++++++--------
.../hadoop/cql3/CqlPagingRecordReader.java | 5 +++--
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 3 ++-
.../cassandra/hadoop/pig/CassandraStorage.java | 5 +++--
.../apache/cassandra/hadoop/pig/CqlStorage.java | 5 +++--
14 files changed, 50 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d8478e9..eec6296 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,7 @@
* Avoid repairing already repaired data (CASSANDRA-5351)
2.0.6
+ * Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
* Add nodetool taketoken to relocate vnodes (CASSANDRA-4445)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d91e0e0..1494585 100644
--- a/build.xml
+++ b/build.xml
@@ -374,6 +374,7 @@
<exclusion groupId="org.mortbay.jetty" artifactId="servlet-api"/>
</dependency>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" version="1.0.3"/>
+ <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" version="4.3"/>
<dependency groupId="org.apache.pig" artifactId="pig" version="0.11.1"/>
<dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/>
@@ -417,6 +418,7 @@
<dependency groupId="org.apache.rat" artifactId="apache-rat"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
+ <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"/>
<dependency groupId="org.apache.pig" artifactId="pig"/>
<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
</artifact:pom>
@@ -482,6 +484,7 @@
<!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
+ <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" optional="true"/>
<dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
<!-- don't need jna to run, but nice to have -->
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 746666b..760193f 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,16 +118,16 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
public List<InputSplit> getSplits(JobContext context) throws IOException
{
- Configuration conf = context.getConfiguration();
+ Configuration conf = HadoopCompat.getConfiguration(context);;
validateConfiguration(conf);
// cannonical ranges and nodes holding replicas
List<TokenRange> masterRangeNodes = getRangeMap(conf);
- keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
- cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
- partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+ keyspace = ConfigHelper.getInputKeyspace(conf);
+ cfName = ConfigHelper.getInputColumnFamily(conf);
+ partitioner = ConfigHelper.getInputPartitioner(conf);
logger.debug("partitioner is {}", partitioner);
@@ -344,7 +345,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
//
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
{
- TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
+ TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
for (int i = 0; i < newInputSplits.size(); i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
index 2040f61..a3c4234 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +73,7 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma
*/
public void checkOutputSpecs(JobContext context)
{
- checkOutputSpecs(context.getConfiguration());
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
}
protected void checkOutputSpecs(Configuration conf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index f1c5f39..566d5ee 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.thrift.Mutation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
@@ -32,7 +33,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
@Override
public void checkOutputSpecs(JobContext context)
{
- checkOutputSpecs(context.getConfiguration());
+ checkOutputSpecs(HadoopCompat.getConfiguration(context));
}
private void checkOutputSpecs(Configuration conf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index f761a8c..a8e2e13 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +86,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
BulkRecordWriter(TaskAttemptContext context)
{
- this(context.getConfiguration());
+ this(HadoopCompat.getConfiguration(context));
this.progress = new Progressable(context);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index fbd5bf2..362cd70 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.composites.CellName;
import org.apache.hadoop.conf.Configuration;
@@ -55,14 +56,14 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
{
- TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
- {
- @Override
- public void progress()
- {
- reporter.progress();
- }
- };
+ TaskAttemptContext tac = HadoopCompat.newMapContext(
+ jobConf,
+ TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
+ null,
+ null,
+ null,
+ new ReporterWrapper(reporter),
+ null);
ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT));
recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index be18f5f..ef883fd 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.*;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.db.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
{
this.split = (ColumnFamilySplit) split;
- Configuration conf = context.getConfiguration();
+ Configuration conf = HadoopCompat.getConfiguration(context);
KeyRange jobRange = ConfigHelper.getInputKeyRange(conf);
filter = jobRange == null ? null : jobRange.row_filter;
predicate = ConfigHelper.getInputSlicePredicate(conf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 6823342..0ae2a67 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.*;
@@ -60,7 +61,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
*/
ColumnFamilyRecordWriter(TaskAttemptContext context)
{
- this(context.getConfiguration());
+ this(HadoopCompat.getConfiguration(context));
this.progressable = new Progressable(context);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
index 0e1509e..6f4478e 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ReporterWrapper;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -58,14 +60,14 @@ public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<St
public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
throws IOException
{
- TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
- {
- @Override
- public void progress()
- {
- reporter.progress();
- }
- };
+ TaskAttemptContext tac = HadoopCompat.newMapContext(
+ jobConf,
+ TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
+ null,
+ null,
+ null,
+ new ReporterWrapper(reporter),
+ null);
CqlPagingRecordReader recordReader = new CqlPagingRecordReader();
recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index 002992f..f712584 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -26,6 +26,7 @@ import java.util.*;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,7 +105,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
{
this.split = (ColumnFamilySplit) split;
- Configuration conf = context.getConfiguration();
+ Configuration conf = HadoopCompat.getConfiguration(context);
totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
? (int) this.split.getLength()
: ConfigHelper.getInputSplitSize(conf);
@@ -123,7 +124,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
}
- partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+ partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index e354ad6..9742762 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
*/
CqlRecordWriter(TaskAttemptContext context) throws IOException
{
- this(context.getConfiguration());
+ this(HadoopCompat.getConfiguration(context));
this.progressable = new Progressable(context);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index ae18d20..56f66bb 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.db.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -282,7 +283,7 @@ public class CassandraStorage extends AbstractCassandraStorage
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
- conf = job.getConfiguration();
+ conf = HadoopCompat.getConfiguration(job);
setLocationFromUri(location);
if (ConfigHelper.getInputSlicePredicate(conf) == null)
@@ -339,7 +340,7 @@ public class CassandraStorage extends AbstractCassandraStorage
/** set store configuration settings */
public void setStoreLocation(String location, Job job) throws IOException
{
- conf = job.getConfiguration();
+ conf = HadoopCompat.getConfiguration(job);
// don't combine mappers to a single mapper per node
conf.setBoolean("pig.noSplitCombination", true);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 76d8026..b349cf7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
+import com.twitter.elephantbird.util.HadoopCompat;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.composites.CellNames;
@@ -197,7 +198,7 @@ public class CqlStorage extends AbstractCassandraStorage
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
- conf = job.getConfiguration();
+ conf = HadoopCompat.getConfiguration(job);
setLocationFromUri(location);
if (username != null && password != null)
@@ -256,7 +257,7 @@ public class CqlStorage extends AbstractCassandraStorage
/** set store configuration settings */
public void setStoreLocation(String location, Job job) throws IOException
{
- conf = job.getConfiguration();
+ conf = HadoopCompat.getConfiguration(job);
setLocationFromUri(location);
if (username != null && password != null)