You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/03/19 19:45:46 UTC

[2/2] git commit: Add progress reporting to hadoop output formats. Patch by brandonwilliams, reviewed by goffinet for CASSANDRA-3859

Add progress reporting to hadoop output formats.
Patch by brandonwilliams, reviewed by goffinet for CASSANDRA-3859


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd806ddd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd806ddd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd806ddd

Branch: refs/heads/cassandra-1.1
Commit: cd806ddd266d69f83f5a5a12283945a7c9fd4e8d
Parents: 438acfc
Author: Brandon Williams <br...@apache.org>
Authored: Mon Mar 19 13:39:58 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Mar 19 13:39:58 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/hadoop/BulkOutputFormat.java  |    2 +-
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |   33 ++++++++--
 .../cassandra/hadoop/ColumnFamilyOutputFormat.java |    2 +-
 .../cassandra/hadoop/ColumnFamilyRecordWriter.java |    9 +++
 .../org/apache/cassandra/hadoop/Progressable.java  |   50 +++++++++++++++
 5 files changed, 88 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index 9902b6b..8fd6f7c 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -67,7 +67,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
     @Deprecated
     public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
     {
-        return new BulkRecordWriter(job);
+        return new BulkRecordWriter(job, new Progressable(progress));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index b247c8d..69be255 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -27,6 +27,8 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -57,6 +59,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     private SSTableSimpleUnsortedWriter writer;
     private SSTableLoader loader;
     private File outputdir;
+    private Progressable progress;
 
     private enum CFType
     {
@@ -76,6 +79,14 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     BulkRecordWriter(TaskAttemptContext context) throws IOException
     {
         this(context.getConfiguration());
+        this.progress = new Progressable(context);
+    }
+
+
+    BulkRecordWriter(Configuration conf, Progressable progress) throws IOException
+    {
+        this(conf);
+        this.progress = progress;
     }
 
     BulkRecordWriter(Configuration conf) throws IOException
@@ -169,6 +180,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                         writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
 	            }
             }
+            progress.progress();
         }
     }
     @Override
@@ -189,13 +201,22 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         if (writer != null)
         {
             writer.close();
-            try
+            SSTableLoader.LoaderFuture future = loader.stream();
+            while (true)
             {
-                loader.stream().get();
-            }
-            catch (InterruptedException e)
-            {
-                throw new IOException(e);
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (TimeoutException te)
+                {
+                    progress.progress();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index ea2ef11..668c4aa 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -119,7 +119,7 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
     @Deprecated @Override
     public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
     {
-        return new ColumnFamilyRecordWriter(job);
+        return new ColumnFamilyRecordWriter(job, new Progressable(progress));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index f6d0ed4..5e36a80 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -78,6 +78,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     private final long batchThreshold;
 
     private final ConsistencyLevel consistencyLevel;
+    private Progressable progressable;
 
 
     /**
@@ -90,6 +91,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
     {
         this(context.getConfiguration());
+        this.progressable = new Progressable(context);
+    }
+
+    ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) throws IOException
+    {
+        this(conf);
+        this.progressable = progressable;
     }
 
     ColumnFamilyRecordWriter(Configuration conf) throws IOException
@@ -133,6 +141,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
         for (Mutation amut : value)
             client.put(new Pair<ByteBuffer,Mutation>(keybuff, amut));
+            progressable.progress();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd806ddd/src/java/org/apache/cassandra/hadoop/Progressable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/Progressable.java b/src/java/org/apache/cassandra/hadoop/Progressable.java
new file mode 100644
index 0000000..091a828
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/Progressable.java
@@ -0,0 +1,50 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class Progressable
+{
+    private TaskAttemptContext context;
+    private org.apache.hadoop.util.Progressable progressable;
+
+    Progressable(TaskAttemptContext context)
+    {
+        this.context = context;
+    }
+
+    Progressable(org.apache.hadoop.util.Progressable progressable)
+    {
+        this.progressable = progressable;
+    }
+
+    public void progress()
+    {
+        if (context != null)
+            context.progress();
+        else
+            progressable.progress();
+    }
+
+}