You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2016/01/11 08:53:54 UTC
[06/12] drill git commit: DRILL-4241: Add parallelization and
assignment
DRILL-4241: Add parallelization and assignment
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f3f401e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f3f401e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f3f401e
Branch: refs/heads/master
Commit: 4f3f401e4c299b007218e5231ad43f9977f01dd2
Parents: b1d9d79
Author: Steven Phillips <sm...@apache.org>
Authored: Thu Nov 19 16:00:40 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jan 10 22:54:21 2016 -0800
----------------------------------------------------------------------
.../drill/exec/store/kudu/KuduGroupScan.java | 99 +++++++++++++++++++-
.../drill/exec/store/kudu/KuduRecordReader.java | 6 +-
.../drill/exec/store/kudu/KuduSubScan.java | 16 +++-
3 files changed, 114 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4f3f401e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
index bc543d9..8403632 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -18,9 +18,14 @@
package org.apache.drill.exec.store.kudu;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -40,16 +45,27 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.kududb.client.LocatedTablet;
+import org.kududb.client.LocatedTablet.Replica;
@JsonTypeName("kudu-scan")
public class KuduGroupScan extends AbstractGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class);
+ private static final long DEFAULT_TABLET_SIZE = 1000;
private KuduStoragePluginConfig storagePluginConfig;
private List<SchemaPath> columns;
private KuduScanSpec kuduScanSpec;
private KuduStoragePlugin storagePlugin;
private boolean filterPushedDown = false;
+ private List<KuduWork> kuduWorkList = Lists.newArrayList();
+ private ListMultimap<Integer,KuduWork> assignments;
+ private List<EndpointAffinity> affinities;
@JsonCreator
@@ -67,6 +83,67 @@ public class KuduGroupScan extends AbstractGroupScan {
this.storagePluginConfig = storagePlugin.getConfig();
this.kuduScanSpec = scanSpec;
this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+ init();
+ }
+
+ private void init() {
+ String tableName = kuduScanSpec.getTableName();
+ Collection<DrillbitEndpoint> endpoints = storagePlugin.getContext().getBits();
+ Map<String,DrillbitEndpoint> endpointMap = Maps.newHashMap();
+ for (DrillbitEndpoint endpoint : endpoints) {
+ endpointMap.put(endpoint.getAddress(), endpoint);
+ }
+ try {
+ List<LocatedTablet> locations = storagePlugin.getClient().openTable(tableName).getTabletsLocations(10000);
+ for (LocatedTablet tablet : locations) {
+ KuduWork work = new KuduWork(tablet.getPartition().getPartitionKeyStart(), tablet.getPartition().getPartitionKeyEnd());
+ for (Replica replica : tablet.getReplicas()) {
+ String host = replica.getRpcHost();
+ DrillbitEndpoint ep = endpointMap.get(host);
+ if (ep != null) {
+ work.getByteMap().add(ep, DEFAULT_TABLET_SIZE);
+ }
+ }
+ kuduWorkList.add(work);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class KuduWork implements CompleteWork {
+
+ private EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+ private byte[] partitionKeyStart;
+ private byte[] partitionKeyEnd;
+
+ public KuduWork(byte[] partitionKeyStart, byte[] partitionKeyEnd) {
+ this.partitionKeyStart = partitionKeyStart;
+ this.partitionKeyEnd = partitionKeyEnd;
+ }
+
+ public byte[] getPartitionKeyStart() {
+ return partitionKeyStart;
+ }
+
+ public byte[] getPartitionKeyEnd() {
+ return partitionKeyEnd;
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return DEFAULT_TABLET_SIZE;
+ }
+
+ @Override
+ public EndpointByteMap getByteMap() {
+ return byteMap;
+ }
+
+ @Override
+ public int compareTo(CompleteWork o) {
+ return 0;
+ }
}
/**
@@ -80,6 +157,8 @@ public class KuduGroupScan extends AbstractGroupScan {
this.storagePlugin = that.storagePlugin;
this.storagePluginConfig = that.storagePluginConfig;
this.filterPushedDown = that.filterPushedDown;
+ this.kuduWorkList = that.kuduWorkList;
+ this.assignments = that.assignments;
}
@Override
@@ -91,13 +170,16 @@ public class KuduGroupScan extends AbstractGroupScan {
@Override
public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.EMPTY_LIST;
+ if (affinities == null) {
+ affinities = AffinityCreator.getAffinityMap(kuduWorkList);
+ }
+ return affinities;
}
@Override
public int getMaxParallelizationWidth() {
- return 1;
+ return kuduWorkList.size();
}
@@ -107,14 +189,21 @@ public class KuduGroupScan extends AbstractGroupScan {
*/
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+ assignments = AssignmentCreator.getMappings(incomingEndpoints, kuduWorkList, storagePlugin.getContext());
}
@Override
public KuduSubScan getSpecificScan(int minorFragmentId) {
- return new KuduSubScan(storagePlugin, storagePluginConfig,
- ImmutableList.of(new KuduSubScanSpec(kuduScanSpec.getTableName())),
- this.columns);
+ List<KuduWork> workList = assignments.get(minorFragmentId);
+
+ List<KuduSubScanSpec> scanSpecList = Lists.newArrayList();
+
+ for (KuduWork work : workList) {
+ scanSpecList.add(new KuduSubScanSpec(getTableName(), work.getPartitionKeyStart(), work.getPartitionKeyEnd()));
+ }
+
+ return new KuduSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns);
}
// KuduStoragePlugin plugin, KuduStoragePluginConfig config,
http://git-wip-us.apache.org/repos/asf/drill/blob/4f3f401e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 17427f1..adbdb83 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,7 @@ public class KuduRecordReader extends AbstractRecordReader {
setColumns(projectedColumns);
this.client = client;
scanSpec = subScanSpec;
+ logger.debug("Scan spec: {}", subScanSpec);
}
@Override
@@ -94,7 +95,10 @@ public class KuduRecordReader extends AbstractRecordReader {
}
builder.setProjectedColumnNames(colNames);
}
- scanner = builder.build();
+ scanner = builder
+ .lowerBoundPartitionKeyRaw(scanSpec.getStartKey())
+ .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey())
+ .build();
} catch (Exception e) {
throw new ExecutionSetupException(e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4f3f401e/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
index 267ee77..9025db7 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
@@ -115,16 +115,30 @@ public class KuduSubScan extends AbstractBase implements SubScan {
public static class KuduSubScanSpec {
private final String tableName;
+ private final byte[] startKey;
+ private final byte[] endKey;
@JsonCreator
- public KuduSubScanSpec(@JsonProperty("tableName") String tableName) {
+ public KuduSubScanSpec(@JsonProperty("tableName") String tableName,
+ @JsonProperty("startKey") byte[] startKey,
+ @JsonProperty("endKey") byte[] endKey) {
this.tableName = tableName;
+ this.startKey = startKey;
+ this.endKey = endKey;
}
public String getTableName() {
return tableName;
}
+ public byte[] getStartKey() {
+ return startKey;
+ }
+
+ public byte[] getEndKey() {
+ return endKey;
+ }
+
}
@Override