You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/02/11 22:56:32 UTC

git commit: Use EB HadoopCompat for compat with Hadoop 0.2.x Patch by Ben Coversaton, reviewed by brandonwilliams for CASSANDRA-5201

Updated Branches:
  refs/heads/trunk 6de2fd9bf -> f007a3535


Use EB HadoopCompat for compat with Hadoop 0.2.x
Patch by Ben Coversaton, reviewed by brandonwilliams for CASSANDRA-5201


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f007a353
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f007a353
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f007a353

Branch: refs/heads/trunk
Commit: f007a35357da582e928dc1ac872e4ebb4c09b708
Parents: 6de2fd9
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 11 15:52:07 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Feb 11 15:53:43 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 build.xml                                         |  3 +++
 .../hadoop/AbstractColumnFamilyInputFormat.java   | 11 ++++++-----
 .../hadoop/AbstractColumnFamilyOutputFormat.java  |  3 ++-
 .../apache/cassandra/hadoop/BulkOutputFormat.java |  3 ++-
 .../apache/cassandra/hadoop/BulkRecordWriter.java |  3 ++-
 .../cassandra/hadoop/ColumnFamilyInputFormat.java | 17 +++++++++--------
 .../hadoop/ColumnFamilyRecordReader.java          |  3 ++-
 .../hadoop/ColumnFamilyRecordWriter.java          |  3 ++-
 .../hadoop/cql3/CqlPagingInputFormat.java         | 18 ++++++++++--------
 .../hadoop/cql3/CqlPagingRecordReader.java        |  5 +++--
 .../cassandra/hadoop/cql3/CqlRecordWriter.java    |  3 ++-
 .../cassandra/hadoop/pig/CassandraStorage.java    |  5 +++--
 .../apache/cassandra/hadoop/pig/CqlStorage.java   |  5 +++--
 14 files changed, 50 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d8478e9..eec6296 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,7 @@
  * Avoid repairing already repaired data (CASSANDRA-5351)
 
 2.0.6
+ * Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
  * Fix EstimatedHistogram races (CASSANDRA-6682)
  * Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
  * Add nodetool taketoken to relocate vnodes (CASSANDRA-4445)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d91e0e0..1494585 100644
--- a/build.xml
+++ b/build.xml
@@ -374,6 +374,7 @@
           	<exclusion groupId="org.mortbay.jetty" artifactId="servlet-api"/>
           </dependency>
           <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" version="1.0.3"/>
+          <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" version="4.3"/>
           <dependency groupId="org.apache.pig" artifactId="pig" version="0.11.1"/>
           <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/>
 
@@ -417,6 +418,7 @@
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
+        <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"/>
         <dependency groupId="org.apache.pig" artifactId="pig"/>
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
       </artifact:pom>
@@ -482,6 +484,7 @@
         <!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
+        <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" optional="true"/>
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
 
         <!-- don't need jna to run, but nice to have -->

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 746666b..760193f 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,16 +118,16 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
 
     public List<InputSplit> getSplits(JobContext context) throws IOException
     {
-        Configuration conf = context.getConfiguration();
+        Configuration conf = HadoopCompat.getConfiguration(context);;
 
         validateConfiguration(conf);
 
         // cannonical ranges and nodes holding replicas
         List<TokenRange> masterRangeNodes = getRangeMap(conf);
 
-        keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
-        cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
-        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+        keyspace = ConfigHelper.getInputKeyspace(conf);
+        cfName = ConfigHelper.getInputColumnFamily(conf);
+        partitioner = ConfigHelper.getInputPartitioner(conf);
         logger.debug("partitioner is {}", partitioner);
 
 
@@ -344,7 +345,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     //
     public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
     {
-        TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
+        TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
         List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
         org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
         for (int i = 0; i < newInputSplits.size(); i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
index 2040f61..a3c4234 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +73,7 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma
      */
     public void checkOutputSpecs(JobContext context)
     {
-        checkOutputSpecs(context.getConfiguration());
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
     }
 
     protected void checkOutputSpecs(Configuration conf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index f1c5f39..566d5ee 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
@@ -32,7 +33,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
     @Override
     public void checkOutputSpecs(JobContext context)
     {
-        checkOutputSpecs(context.getConfiguration());
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
     }
 
     private void checkOutputSpecs(Configuration conf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index f761a8c..a8e2e13 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +86,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
     BulkRecordWriter(TaskAttemptContext context)
     {
-        this(context.getConfiguration());
+        this(HadoopCompat.getConfiguration(context));
         this.progress = new Progressable(context);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index fbd5bf2..362cd70 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.hadoop.conf.Configuration;
@@ -55,14 +56,14 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
 
     public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
     {
-        TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
-        {
-            @Override
-            public void progress()
-            {
-                reporter.progress();
-            }
-        };
+        TaskAttemptContext tac = HadoopCompat.newMapContext(
+                jobConf,
+                TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
+                null,
+                null,
+                null,
+                new ReporterWrapper(reporter),
+                null);
 
         ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT));
         recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index be18f5f..ef883fd 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.*;
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
     {
         this.split = (ColumnFamilySplit) split;
-        Configuration conf = context.getConfiguration();
+        Configuration conf = HadoopCompat.getConfiguration(context);
         KeyRange jobRange = ConfigHelper.getInputKeyRange(conf);
         filter = jobRange == null ? null : jobRange.row_filter;
         predicate = ConfigHelper.getInputSlicePredicate(conf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 6823342..0ae2a67 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
@@ -60,7 +61,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
      */
     ColumnFamilyRecordWriter(TaskAttemptContext context)
     {
-        this(context.getConfiguration());
+        this(HadoopCompat.getConfiguration(context));
         this.progressable = new Progressable(context);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
index 0e1509e..6f4478e 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ReporterWrapper;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -58,14 +60,14 @@ public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<St
     public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
             throws IOException
     {
-        TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
-        {
-            @Override
-            public void progress()
-            {
-                reporter.progress();
-            }
-        };
+        TaskAttemptContext tac = HadoopCompat.newMapContext(
+                jobConf,
+                TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
+                null,
+                null,
+                null,
+                new ReporterWrapper(reporter),
+                null);
 
         CqlPagingRecordReader recordReader = new CqlPagingRecordReader();
         recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index 002992f..f712584 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -26,6 +26,7 @@ import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,7 +105,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
     {
         this.split = (ColumnFamilySplit) split;
-        Configuration conf = context.getConfiguration();
+        Configuration conf = HadoopCompat.getConfiguration(context);
         totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
                       ? (int) this.split.getLength()
                       : ConfigHelper.getInputSplitSize(conf);
@@ -123,7 +124,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
             pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
         }
 
-        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+        partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index e354ad6..9742762 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +85,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
      */
     CqlRecordWriter(TaskAttemptContext context) throws IOException
     {
-        this(context.getConfiguration());
+        this(HadoopCompat.getConfiguration(context));
         this.progressable = new Progressable(context);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index ae18d20..56f66bb 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -282,7 +283,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
-        conf = job.getConfiguration();
+        conf = HadoopCompat.getConfiguration(job);
         setLocationFromUri(location);
 
         if (ConfigHelper.getInputSlicePredicate(conf) == null)
@@ -339,7 +340,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** set store configuration settings */
     public void setStoreLocation(String location, Job job) throws IOException
     {
-        conf = job.getConfiguration();
+        conf = HadoopCompat.getConfiguration(job);
         
         // don't combine mappers to a single mapper per node
         conf.setBoolean("pig.noSplitCombination", true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 76d8026..b349cf7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
+import com.twitter.elephantbird.util.HadoopCompat;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.composites.CellNames;
@@ -197,7 +198,7 @@ public class CqlStorage extends AbstractCassandraStorage
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
-        conf = job.getConfiguration();
+        conf = HadoopCompat.getConfiguration(job);
         setLocationFromUri(location);
 
         if (username != null && password != null)
@@ -256,7 +257,7 @@ public class CqlStorage extends AbstractCassandraStorage
     /** set store configuration settings */
     public void setStoreLocation(String location, Job job) throws IOException
     {
-        conf = job.getConfiguration();
+        conf = HadoopCompat.getConfiguration(job);
         setLocationFromUri(location);
 
         if (username != null && password != null)