You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 08:53:54 UTC

[06/12] drill git commit: DRILL-4241: Add parallelization and assignment

DRILL-4241: Add parallelization and assignment


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f3f401e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f3f401e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f3f401e

Branch: refs/heads/master
Commit: 4f3f401e4c299b007218e5231ad43f9977f01dd2
Parents: b1d9d79
Author: Steven Phillips <sm...@apache.org>
Authored: Thu Nov 19 16:00:40 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 22:54:21 2016 -0800

----------------------------------------------------------------------
 .../drill/exec/store/kudu/KuduGroupScan.java    | 99 +++++++++++++++++++-
 .../drill/exec/store/kudu/KuduRecordReader.java |  6 +-
 .../drill/exec/store/kudu/KuduSubScan.java      | 16 +++-
 3 files changed, 114 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4f3f401e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index bc543d9..8403632 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -18,9 +18,14 @@
 package org.apache.drill.exec.store.kudu;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
@@ -40,16 +45,27 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.kududb.client.LocatedTablet;
+import org.kududb.client.LocatedTablet.Replica;
 
 @JsonTypeName("kudu-scan")
 public class KuduGroupScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
 
   private KuduStoragePluginConfig storagePluginConfig;
   private List<SchemaPath> columns;
   private KuduScanSpec kuduScanSpec;
   private KuduStoragePlugin storagePlugin;
   private boolean filterPushedDown = false;
+  private List<KuduWork> kuduWorkList = Lists.newArrayList();
+  private ListMultimap<Integer,KuduWork> assignments;
+  private List<EndpointAffinity> affinities;
 
 
   @JsonCreator
@@ -67,6 +83,67 @@ public class KuduGroupScan extends AbstractGroupScan {
     this.storagePluginConfig = storagePlugin.getConfig();
     this.kuduScanSpec = scanSpec;
     this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+    init();
+  }
+
+  private void init() {
+    String tableName = kuduScanSpec.getTableName();
+    Collection<DrillbitEndpoint> endpoints = storagePlugin.getContext().getBits();
+    Map<String,DrillbitEndpoint> endpointMap = Maps.newHashMap();
+    for (DrillbitEndpoint endpoint : endpoints) {
+      endpointMap.put(endpoint.getAddress(), endpoint);
+    }
+    try {
+      List<LocatedTablet> locations = storagePlugin.getClient().openTable(tableName).getTabletsLocations(10000);
+      for (LocatedTablet tablet : locations) {
+        KuduWork work = new KuduWork(tablet.getPartition().getPartitionKeyStart(), tablet.getPartition().getPartitionKeyEnd());
+        for (Replica replica : tablet.getReplicas()) {
+          String host = replica.getRpcHost();
+          DrillbitEndpoint ep = endpointMap.get(host);
+          if (ep != null) {
+            work.getByteMap().add(ep, DEFAULT_TABLET_SIZE);
+          }
+        }
+        kuduWorkList.add(work);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static class KuduWork implements CompleteWork {
+
+    private EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+    private byte[] partitionKeyStart;
+    private byte[] partitionKeyEnd;
+
+    public KuduWork(byte[] partitionKeyStart, byte[] partitionKeyEnd) {
+      this.partitionKeyStart = partitionKeyStart;
+      this.partitionKeyEnd = partitionKeyEnd;
+    }
+
+    public byte[] getPartitionKeyStart() {
+      return partitionKeyStart;
+    }
+
+    public byte[] getPartitionKeyEnd() {
+      return partitionKeyEnd;
+    }
+
+    @Override
+    public long getTotalBytes() {
+      return DEFAULT_TABLET_SIZE;
+    }
+
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
+    }
+
+    @Override
+    public int compareTo(CompleteWork o) {
+      return 0;
+    }
   }
 
   /**
@@ -80,6 +157,8 @@ public class KuduGroupScan extends AbstractGroupScan {
     this.storagePlugin = that.storagePlugin;
     this.storagePluginConfig = that.storagePluginConfig;
     this.filterPushedDown = that.filterPushedDown;
+    this.kuduWorkList = that.kuduWorkList;
+    this.assignments = that.assignments;
   }
 
   @Override
@@ -91,13 +170,16 @@ public class KuduGroupScan extends AbstractGroupScan {
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.EMPTY_LIST;
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(kuduWorkList);
+    }
+    return affinities;
   }
 
 
   @Override
   public int getMaxParallelizationWidth() {
-    return 1;
+    return kuduWorkList.size();
   }
 
 
@@ -107,14 +189,21 @@ public class KuduGroupScan extends AbstractGroupScan {
    */
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList, storagePlugin.getContext());
   }
 
 
   @Override
   public KuduSubScan getSpecificScan(int minorFragmentId) {
-    return new KuduSubScan(storagePlugin, storagePluginConfig,
-        ImmutableList.of(new KuduSubScanSpec(kuduScanSpec.getTableName())),
-        this.columns);
+    List<KuduWork> workList = assignments.get(minorFragmentId);
+
+    List<KuduSubScanSpec> scanSpecList = Lists.newArrayList();
+
+    for (KuduWork work : workList) {
+      scanSpecList.add(new KuduSubScanSpec(getTableName(), work.getPartitionKeyStart(), work.getPartitionKeyEnd()));
+    }
+
+    return new KuduSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns);
   }
 
   // KuduStoragePlugin plugin, KuduStoragePluginConfig config,

http://git-wip-us.apache.org/repos/asf/drill/blob/4f3f401e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 17427f1..adbdb83 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,7 @@ public class KuduRecordReader extends AbstractRecordReader {
     setColumns(projectedColumns);
     this.client = client;
     scanSpec = subScanSpec;
+    logger.debug("Scan spec: {}", subScanSpec);
   }
 
   @Override
@@ -94,7 +95,10 @@ public class KuduRecordReader extends AbstractRecordReader {
         }
         builder.setProjectedColumnNames(colNames);
       }
-      scanner = builder.build();
+      scanner = builder
+              .lowerBoundPartitionKeyRaw(scanSpec.getStartKey())
+              .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey())
+              .build();
     } catch (Exception e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f3f401e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
index 267ee77..9025db7 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
@@ -115,16 +115,30 @@ public class KuduSubScan extends AbstractBase implements SubScan {
   public static class KuduSubScanSpec {
 
     private final String tableName;
+    private final byte[] startKey;
+    private final byte[] endKey;
 
     @JsonCreator
-    public KuduSubScanSpec(@JsonProperty("tableName") String tableName) {
+    public KuduSubScanSpec(@JsonProperty("tableName") String tableName,
+                           @JsonProperty("startKey") byte[] startKey,
+                           @JsonProperty("endKey") byte[] endKey) {
       this.tableName = tableName;
+      this.startKey = startKey;
+      this.endKey = endKey;
     }
 
     public String getTableName() {
       return tableName;
     }
 
+    public byte[] getStartKey() {
+      return startKey;
+    }
+
+    public byte[] getEndKey() {
+      return endKey;
+    }
+
   }
 
   @Override