You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by en...@apache.org on 2015/05/22 09:35:17 UTC
[1/4] phoenix git commit: PHOENIX-1763 Support building with
HBase-1.1.0
Repository: phoenix
Updated Branches:
refs/heads/4.4-HBase-1.1 bf01eb209 -> c2fe34f74
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-pig/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-pig/pom.xml b/phoenix-pig/pom.xml
index 2db1af6..015a660 100644
--- a/phoenix-pig/pom.xml
+++ b/phoenix-pig/pom.xml
@@ -54,7 +54,6 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
@@ -67,7 +66,6 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
@@ -80,41 +78,56 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
- <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
- <version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index adeed88..a232cf4 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -460,6 +460,13 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${hbase.version}</version>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d310c37..4361e54 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
<test.output.tofile>true</test.output.tofile>
<!-- Hadoop Versions -->
- <hbase.version>1.0.1</hbase.version>
+ <hbase.version>1.1.0</hbase.version>
<hadoop-two.version>2.5.1</hadoop-two.version>
<!-- Dependency versions -->
@@ -452,6 +452,11 @@
<!-- HBase dependencies -->
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-annotations</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
@@ -488,13 +493,34 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
@@ -508,6 +534,19 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${hbase.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!-- Hadoop Dependencies -->
<dependency>
[3/4] phoenix git commit: PHOENIX-1681 Use the new Region Interface
(Andrew Purtell)
Posted by en...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 272cac6..e7e1dd7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
@@ -111,7 +111,7 @@ public class StatisticsCollector {
this.statsTable.close();
}
- public void updateStatistic(HRegion region) {
+ public void updateStatistic(Region region) {
try {
ArrayList<Mutation> mutations = new ArrayList<Mutation>();
writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
@@ -126,7 +126,7 @@ public class StatisticsCollector {
}
}
- private void writeStatsToStatsTable(final HRegion region,
+ private void writeStatsToStatsTable(final Region region,
boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
try {
// update the statistics table
@@ -215,7 +215,7 @@ public class StatisticsCollector {
}
}
- public InternalScanner createCompactionScanner(HRegion region, Store store, InternalScanner s) throws IOException {
+ public InternalScanner createCompactionScanner(Region region, Store store, InternalScanner s) throws IOException {
// See if this is for Major compaction
if (logger.isDebugEnabled()) {
logger.debug("Compaction scanner created for stats");
@@ -224,13 +224,13 @@ public class StatisticsCollector {
return getInternalScanner(region, store, s, cfKey);
}
- public void splitStats(HRegion parent, HRegion left, HRegion right) {
+ public void splitStats(Region parent, Region left, Region right) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Collecting stats for split of " + parent.getRegionInfo() + " into " + left.getRegionInfo() + " and " + right.getRegionInfo());
}
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
- for (byte[] fam : parent.getStores().keySet()) {
+ for (byte[] fam : parent.getTableDesc().getFamiliesKeys()) {
statsTable.splitStats(parent, left, right, this, new ImmutableBytesPtr(fam), mutations);
}
if (logger.isDebugEnabled()) {
@@ -243,7 +243,7 @@ public class StatisticsCollector {
}
}
- protected InternalScanner getInternalScanner(HRegion region, Store store,
+ protected InternalScanner getInternalScanner(Region region, Store store,
InternalScanner internalScan, ImmutableBytesPtr family) {
return new StatisticsScanner(this, statsTable, region, internalScan, family);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 0e50923..582c4de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -26,9 +26,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
@@ -38,11 +38,11 @@ public class StatisticsScanner implements InternalScanner {
private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
private InternalScanner delegate;
private StatisticsWriter stats;
- private HRegion region;
+ private Region region;
private StatisticsCollector tracker;
private ImmutableBytesPtr family;
- public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region,
+ public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, Region region,
InternalScanner delegate, ImmutableBytesPtr family) {
this.tracker = tracker;
this.stats = stats;
@@ -85,17 +85,17 @@ public class StatisticsScanner implements InternalScanner {
// Just verify if this if fine
ArrayList<Mutation> mutations = new ArrayList<Mutation>();
if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
+ LOG.debug("Deleting the stats for the region " + region.getRegionInfo().getRegionNameAsString()
+ " as part of major compaction");
}
- stats.deleteStats(region.getRegionName(), this.tracker, family, mutations);
+ stats.deleteStats(region.getRegionInfo().getRegionName(), this.tracker, family, mutations);
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
+ LOG.debug("Adding new stats for the region " + region.getRegionInfo().getRegionNameAsString()
+ " as part of major compaction");
}
- stats.addStats(region.getRegionName(), this.tracker, family, mutations);
+ stats.addStats(region.getRegionInfo().getRegionName(), this.tracker, family, mutations);
if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
+ LOG.debug("Committing new stats for the region " + region.getRegionInfo().getRegionNameAsString()
+ " as part of major compaction");
}
stats.commitStats(mutations);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 8756568..834675c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -105,7 +105,7 @@ public class StatisticsWriter implements Closeable {
statsWriterTable.close();
}
- public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, ImmutableBytesPtr cfKey,
+ public void splitStats(Region p, Region l, Region r, StatisticsCollector tracker, ImmutableBytesPtr cfKey,
List<Mutation> mutations) throws IOException {
if (tracker == null) { return; }
boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
@@ -113,8 +113,8 @@ public class StatisticsWriter implements Closeable {
mutations.add(getLastStatsUpdatedTimePut(clientTimeStamp));
}
long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp;
- Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, cfKey, p.getRegionName(),
- readTimeStamp);
+ Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, cfKey,
+ p.getRegionInfo().getRegionName(), readTimeStamp);
byte[] minKey = HConstants.EMPTY_BYTE_ARRAY;
if (result != null && !result.isEmpty()) {
Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
@@ -133,13 +133,13 @@ public class StatisticsWriter implements Closeable {
GuidePostsInfo guidePostsRegionInfo = GuidePostsInfo.deserializeGuidePostsInfo(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength(), rowCount);
- byte[] pPrefix = StatisticsUtil.getRowKey(tableName, cfKey, p.getRegionName());
+ byte[] pPrefix = StatisticsUtil.getRowKey(tableName, cfKey, p.getRegionInfo().getRegionName());
mutations.add(new Delete(pPrefix, writeTimeStamp));
long byteSize = 0;
Cell byteSizeCell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- int index = Collections.binarySearch(guidePostsRegionInfo.getGuidePosts(), r.getStartKey(),
+ int index = Collections.binarySearch(guidePostsRegionInfo.getGuidePosts(), r.getRegionInfo().getStartKey(),
Bytes.BYTES_COMPARATOR);
int size = guidePostsRegionInfo.getGuidePosts().size();
int midEndIndex, midStartIndex;
@@ -175,7 +175,7 @@ public class StatisticsWriter implements Closeable {
tracker.clear();
tracker.addGuidePost(cfKey, lguidePosts, leftByteCount, cell.getTimestamp(),
minKey);
- addStats(l.getRegionName(), tracker, cfKey, mutations);
+ addStats(l.getRegionInfo().getRegionName(), tracker, cfKey, mutations);
}
if (midStartIndex < size) {
GuidePostsInfo rguidePosts =
@@ -184,7 +184,7 @@ public class StatisticsWriter implements Closeable {
tracker.clear();
tracker.addGuidePost(cfKey, rguidePosts, rightByteCount, cell.getTimestamp(),
guidePostsRegionInfo.getGuidePosts().get(midStartIndex));
- addStats(r.getRegionName(), tracker, cfKey, mutations);
+ addStats(r.getRegionInfo().getRegionName(), tracker, cfKey, mutations);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index ca25348..3bf6f23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
@@ -305,47 +305,49 @@ public class IndexUtil {
});
}
- public static HRegion getIndexRegion(RegionCoprocessorEnvironment environment)
+ public static Region getIndexRegion(RegionCoprocessorEnvironment environment)
throws IOException {
- HRegion dataRegion = environment.getRegion();
+ Region dataRegion = environment.getRegion();
return getIndexRegion(dataRegion, environment.getRegionServerServices());
}
- public static HRegion
- getIndexRegion(HRegion dataRegion, RegionServerCoprocessorEnvironment env)
+ public static Region
+ getIndexRegion(Region dataRegion, RegionServerCoprocessorEnvironment env)
throws IOException {
return getIndexRegion(dataRegion, env.getRegionServerServices());
}
- public static HRegion getDataRegion(RegionCoprocessorEnvironment env) throws IOException {
- HRegion indexRegion = env.getRegion();
+ public static Region getDataRegion(RegionCoprocessorEnvironment env) throws IOException {
+ Region indexRegion = env.getRegion();
return getDataRegion(indexRegion, env.getRegionServerServices());
}
- public static HRegion
- getDataRegion(HRegion indexRegion, RegionServerCoprocessorEnvironment env)
+ public static Region
+ getDataRegion(Region indexRegion, RegionServerCoprocessorEnvironment env)
throws IOException {
return getDataRegion(indexRegion, env.getRegionServerServices());
}
- public static HRegion getIndexRegion(HRegion dataRegion, RegionServerServices rss) throws IOException {
+ public static Region getIndexRegion(Region dataRegion, RegionServerServices rss) throws IOException {
TableName indexTableName =
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(dataRegion.getTableDesc()
.getName()));
- List<HRegion> onlineRegions = rss.getOnlineRegions(indexTableName);
- for(HRegion indexRegion : onlineRegions) {
- if (Bytes.compareTo(dataRegion.getStartKey(), indexRegion.getStartKey()) == 0) {
+ List<Region> onlineRegions = rss.getOnlineRegions(indexTableName);
+ for(Region indexRegion : onlineRegions) {
+ if (Bytes.compareTo(dataRegion.getRegionInfo().getStartKey(),
+ indexRegion.getRegionInfo().getStartKey()) == 0) {
return indexRegion;
}
}
return null;
}
- public static HRegion getDataRegion(HRegion indexRegion, RegionServerServices rss) throws IOException {
+ public static Region getDataRegion(Region indexRegion, RegionServerServices rss) throws IOException {
TableName dataTableName = TableName.valueOf(MetaDataUtil.getUserTableName(indexRegion.getTableDesc().getNameAsString()));
- List<HRegion> onlineRegions = rss.getOnlineRegions(dataTableName);
- for(HRegion region : onlineRegions) {
- if (Bytes.compareTo(indexRegion.getStartKey(), region.getStartKey()) == 0) {
+ List<Region> onlineRegions = rss.getOnlineRegions(dataTableName);
+ for(Region region : onlineRegions) {
+ if (Bytes.compareTo(indexRegion.getRegionInfo().getStartKey(),
+ region.getRegionInfo().getStartKey()) == 0) {
return region;
}
}
@@ -466,7 +468,7 @@ public class IndexUtil {
public static void wrapResultUsingOffset(final ObserverContext<RegionCoprocessorEnvironment> c,
List<Cell> result, final int offset, ColumnReference[] dataColumns,
- TupleProjector tupleProjector, HRegion dataRegion, IndexMaintainer indexMaintainer,
+ TupleProjector tupleProjector, Region dataRegion, IndexMaintainer indexMaintainer,
byte[][] viewConstants, ImmutableBytesWritable ptr) throws IOException {
if (tupleProjector != null) {
// Join back to data table here by issuing a local get projecting
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index e996b23..fa8bd85 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -65,7 +65,7 @@ public class TestLocalTableState {
RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
Mockito.when(env.getConfiguration()).thenReturn(conf);
- HRegion region = Mockito.mock(HRegion.class);
+ Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
RegionScanner scanner = Mockito.mock(RegionScanner.class);
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
@@ -108,7 +108,7 @@ public class TestLocalTableState {
// setup mocks
RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
- HRegion region = Mockito.mock(HRegion.class);
+ Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
RegionScanner scanner = Mockito.mock(RegionScanner.class);
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
@@ -154,7 +154,7 @@ public class TestLocalTableState {
// setup mocks
RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
- HRegion region = Mockito.mock(HRegion.class);
+ Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
RegionScanner scanner = Mockito.mock(RegionScanner.class);
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index ae577bd..b381e9f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@@ -201,7 +201,7 @@ public class TestWALRecoveryCaching {
// kill the server where the tables live - this should trigger distributed log splitting
// find the regionserver that matches the passed server
- List<HRegion> online = new ArrayList<HRegion>();
+ List<Region> online = new ArrayList<Region>();
online.addAll(getRegionsFromServerForTable(util.getMiniHBaseCluster(), shared,
testTable.getTableName()));
online.addAll(getRegionsFromServerForTable(util.getMiniHBaseCluster(), shared,
@@ -267,9 +267,9 @@ public class TestWALRecoveryCaching {
* @param table
* @return
*/
- private List<HRegion> getRegionsFromServerForTable(MiniHBaseCluster cluster, ServerName server,
+ private List<Region> getRegionsFromServerForTable(MiniHBaseCluster cluster, ServerName server,
byte[] table) {
- List<HRegion> online = Collections.emptyList();
+ List<Region> online = Collections.emptyList();
for (RegionServerThread rst : cluster.getRegionServerThreads()) {
// if its the server we are going to kill, get the regions we want to reassign
if (rst.getRegionServer().getServerName().equals(server)) {
@@ -305,14 +305,14 @@ public class TestWALRecoveryCaching {
tryIndex = !tryIndex;
for (ServerName server : servers) {
// find the regionserver that matches the passed server
- List<HRegion> online = getRegionsFromServerForTable(cluster, server, table);
+ List<Region> online = getRegionsFromServerForTable(cluster, server, table);
LOG.info("Shutting down and reassigning regions from " + server);
cluster.stopRegionServer(server);
cluster.waitForRegionServerToStop(server, TIMEOUT);
// force reassign the regions from the table
- for (HRegion region : online) {
+ for (Region region : online) {
cluster.getMaster().assignRegion(region.getRegionInfo());
}
@@ -363,10 +363,9 @@ public class TestWALRecoveryCaching {
private Set<ServerName> getServersForTable(MiniHBaseCluster cluster, byte[] table)
throws Exception {
- List<HRegion> indexRegions = cluster.getRegions(table);
Set<ServerName> indexServers = new HashSet<ServerName>();
- for (HRegion region : indexRegions) {
- indexServers.add(cluster.getServerHoldingRegion(null, region.getRegionName()));
+ for (Region region : cluster.getRegions(table)) {
+ indexServers.add(cluster.getServerHoldingRegion(null, region.getRegionInfo().getRegionName()));
}
return indexServers;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/recovery/TestPerRegionIndexWriteCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/recovery/TestPerRegionIndexWriteCache.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/recovery/TestPerRegionIndexWriteCache.java
index cd28627..35b607e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/recovery/TestPerRegionIndexWriteCache.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/recovery/TestPerRegionIndexWriteCache.java
@@ -65,8 +65,8 @@ public class TestPerRegionIndexWriteCache {
p2.add(family, qual, val);
}
- HRegion r1;
- HRegion r2;
+ HRegion r1; // FIXME: Uses private type
+ HRegion r2; // FIXME: Uses private type
WAL wal;
@SuppressWarnings("deprecation")
@@ -212,4 +212,4 @@ public class TestPerRegionIndexWriteCache {
// references around to these edits and have a memory leak
assertNull("Got an entry for a region we removed", cache.getEdits(r1));
}
-}
\ No newline at end of file
+}
[4/4] phoenix git commit: PHOENIX-1681 Use the new Region Interface
(Andrew Purtell)
Posted by en...@apache.org.
PHOENIX-1681 Use the new Region Interface (Andrew Purtell)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c2fe34f7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c2fe34f7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c2fe34f7
Branch: refs/heads/4.4-HBase-1.1
Commit: c2fe34f74f89c613bc1e0fed031512cedf1270b8
Parents: c7182d4
Author: Enis Soztutar <en...@apache.org>
Authored: Thu May 21 23:22:54 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri May 22 00:31:55 2015 -0700
----------------------------------------------------------------------
...ReplayWithIndexWritesAndCompressedWALIT.java | 4 +-
.../EndToEndCoveredColumnsIndexBuilderIT.java | 4 +-
.../IndexHalfStoreFileReaderGenerator.java | 9 +-
.../regionserver/IndexSplitTransaction.java | 65 +++++---------
.../hbase/regionserver/LocalIndexMerger.java | 16 ++--
.../hbase/regionserver/LocalIndexSplitter.java | 11 +--
.../coprocessor/BaseScannerRegionObserver.java | 26 +++---
.../GroupedAggregateRegionObserver.java | 13 +--
.../coprocessor/MetaDataEndpointImpl.java | 94 ++++++++++----------
.../phoenix/coprocessor/ScanRegionObserver.java | 17 ++--
.../coprocessor/SequenceRegionObserver.java | 16 ++--
.../UngroupedAggregateRegionObserver.java | 29 +++---
.../hbase/index/covered/data/LocalTable.java | 5 +-
.../write/ParallelWriterIndexCommitter.java | 8 +-
.../recovery/PerRegionIndexWriteCache.java | 10 +--
.../recovery/StoreFailuresInCachePolicy.java | 4 +-
.../TrackingParallelWriterIndexCommitter.java | 8 +-
.../phoenix/index/PhoenixIndexBuilder.java | 4 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 14 ++-
.../schema/stats/StatisticsCollector.java | 14 +--
.../phoenix/schema/stats/StatisticsScanner.java | 16 ++--
.../phoenix/schema/stats/StatisticsWriter.java | 16 ++--
.../java/org/apache/phoenix/util/IndexUtil.java | 38 ++++----
.../index/covered/TestLocalTableState.java | 8 +-
.../index/write/TestWALRecoveryCaching.java | 17 ++--
.../recovery/TestPerRegionIndexWriteCache.java | 6 +-
26 files changed, 230 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
index 3b8ff29..611ba68 100644
--- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
@@ -159,7 +159,7 @@ public class WALReplayWithIndexWritesAndCompressedWALIT {
}
/**
- * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
+ * Test writing edits into an region, closing it, splitting logs, opening Region again. Verify
* seqids.
* @throws Exception on failure
*/
@@ -183,7 +183,7 @@ public class WALReplayWithIndexWritesAndCompressedWALIT {
builder.build(htd);
// create the region + its WAL
- HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
+ HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); // FIXME: Uses private type
region0.close();
region0.getWAL().close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index d90733f..6b2309e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -312,7 +312,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
HTable primary = new HTable(UTIL.getConfiguration(), tableNameBytes);
// overwrite the codec so we can verify the current state
- HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
+ Region region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
Indexer indexer =
(Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName());
CoveredColumnsIndexBuilder builder =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
index 1284dcf..94d5912 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -76,7 +76,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, Reader reader) throws IOException {
TableName tableName = ctx.getEnvironment().getRegion().getTableDesc().getTableName();
- HRegion region = ctx.getEnvironment().getRegion();
+ Region region = ctx.getEnvironment().getRegion();
HRegionInfo childRegion = region.getRegionInfo();
byte[] splitKey = null;
if (reader == null && r != null) {
@@ -109,7 +109,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
Pair<HRegionInfo, HRegionInfo> mergeRegions =
MetaTableAccessor.getRegionsFromMergeQualifier(ctx.getEnvironment()
.getRegionServerServices().getConnection(),
- region.getRegionName());
+ region.getRegionInfo().getRegionName());
if (mergeRegions == null || mergeRegions.getFirst() == null) return reader;
byte[] splitRow =
CellUtil.cloneRow(KeyValue.createKeyValueFromKey(r.getSplitKey()));
@@ -121,8 +121,9 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
childRegion = mergeRegions.getSecond();
regionStartKeyInHFile = mergeRegions.getSecond().getStartKey();
}
- splitKey = KeyValue.createFirstOnRow(region.getStartKey().length == 0 ?
- new byte[region.getEndKey().length] : region.getStartKey()).getKey();
+ splitKey = KeyValue.createFirstOnRow(region.getRegionInfo().getStartKey().length == 0 ?
+ new byte[region.getRegionInfo().getEndKey().length] :
+ region.getRegionInfo().getStartKey()).getKey();
} else {
HRegionInfo parentRegion = HRegionInfo.getHRegionInfo(result);
regionStartKeyInHFile =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
index 3057a14..71bc520 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
@@ -65,31 +65,8 @@ import org.apache.zookeeper.data.Stat;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-/**
- * Executes region split as a "transaction". Call {@link #prepare()} to setup
- * the transaction, {@link #execute(Server, RegionServerServices)} to run the
- * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
- *
- * <p>Here is an example of how you would use this class:
- * <pre>
- * SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
- * if (!st.prepare()) return;
- * try {
- * st.execute(server, services);
- * } catch (IOException ioe) {
- * try {
- * st.rollback(server, services);
- * return;
- * } catch (RuntimeException e) {
- * myAbortable.abort("Failed split, abort");
- * }
- * }
- * </Pre>
- * <p>This class is not thread safe. Caller needs ensure split is run by
- * one thread only.
- */
@InterfaceAudience.Private
-public class IndexSplitTransaction extends SplitTransaction {
+public class IndexSplitTransaction extends SplitTransactionImpl { // FIXME: Extends private type
private static final Log LOG = LogFactory.getLog(IndexSplitTransaction.class);
/*
@@ -154,9 +131,9 @@ public class IndexSplitTransaction extends SplitTransaction {
* @param r Region to split
* @param splitrow Row to split around
*/
- public IndexSplitTransaction(final HRegion r, final byte [] splitrow) {
+ public IndexSplitTransaction(final Region r, final byte [] splitrow) {
super(r , splitrow);
- this.parent = r;
+ this.parent = (HRegion)r;
this.splitrow = splitrow;
}
@@ -217,7 +194,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* @return Regions created
*/
@Override
- /* package */PairOfSameType<HRegion> createDaughters(final Server server,
+ /* package */PairOfSameType<Region> createDaughters(final Server server,
final RegionServerServices services) throws IOException {
LOG.info("Starting split of region " + this.parent);
if ((server != null && server.isStopped()) ||
@@ -244,14 +221,14 @@ public class IndexSplitTransaction extends SplitTransaction {
server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
this.fileSplitTimeout);
- PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
+ PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
List<Mutation> metaEntries = new ArrayList<Mutation>();
if (this.parent.getCoprocessorHost() != null) {
if (this.parent.getCoprocessorHost().
preSplitBeforePONR(this.splitrow, metaEntries)) {
throw new IOException("Coprocessor bypassing region "
- + this.parent.getRegionNameAsString() + " split.");
+ + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
}
try {
for (Mutation p : metaEntries) {
@@ -303,7 +280,7 @@ public class IndexSplitTransaction extends SplitTransaction {
}
@Override
- public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
+ public PairOfSameType<Region> stepsBeforePONR(final Server server,
final RegionServerServices services, boolean testing) throws IOException {
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
// have zookeeper so don't do zk stuff if server or zookeeper is null
@@ -313,7 +290,7 @@ public class IndexSplitTransaction extends SplitTransaction {
parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
} catch (KeeperException e) {
throw new IOException("Failed creating PENDING_SPLIT znode on " +
- this.parent.getRegionNameAsString(), e);
+ this.parent.getRegionInfo().getRegionNameAsString(), e);
}
}
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
@@ -367,12 +344,12 @@ public class IndexSplitTransaction extends SplitTransaction {
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
+ Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
- return new PairOfSameType<HRegion>(a, b);
+ Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
+ return new PairOfSameType<Region>(a, b);
}
/**
@@ -387,7 +364,7 @@ public class IndexSplitTransaction extends SplitTransaction {
*/
@Override
/* package */void openDaughters(final Server server,
- final RegionServerServices services, HRegion a, HRegion b)
+ final RegionServerServices services, Region a, Region b)
throws IOException {
boolean stopped = server != null && server.isStopped();
boolean stopping = services != null && services.isStopping();
@@ -400,8 +377,8 @@ public class IndexSplitTransaction extends SplitTransaction {
" because stopping=" + stopping + ", stopped=" + stopped);
} else {
// Open daughters in parallel.
- DaughterOpener aOpener = new DaughterOpener(server, a);
- DaughterOpener bOpener = new DaughterOpener(server, b);
+ DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
+ DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
aOpener.start();
bOpener.start();
try {
@@ -444,7 +421,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* Call {@link #rollback(Server, RegionServerServices)}
*/
/* package */void transitionZKNode(final Server server,
- final RegionServerServices services, HRegion a, HRegion b)
+ final RegionServerServices services, Region a, Region b)
throws IOException {
// Tell master about split by updating zk. If we fail, abort.
if (server != null && server.getZooKeeper() != null) {
@@ -556,7 +533,7 @@ public class IndexSplitTransaction extends SplitTransaction {
Thread.currentThread().interrupt();
}
throw new IOException("Failed getting SPLITTING znode on "
- + parent.getRegionNameAsString(), e);
+ + parent.getRegionInfo().getRegionNameAsString(), e);
}
}
@@ -572,10 +549,10 @@ public class IndexSplitTransaction extends SplitTransaction {
* @see #rollback(Server, RegionServerServices)
*/
@Override
- public PairOfSameType<HRegion> execute(final Server server,
+ public PairOfSameType<Region> execute(final Server server,
final RegionServerServices services)
throws IOException {
- PairOfSameType<HRegion> regions = createDaughters(server, services);
+ PairOfSameType<Region> regions = createDaughters(server, services);
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().preSplitAfterPONR();
}
@@ -583,8 +560,8 @@ public class IndexSplitTransaction extends SplitTransaction {
}
@Override
- public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
- final RegionServerServices services, PairOfSameType<HRegion> regions)
+ public PairOfSameType<Region> stepsAfterPONR(final Server server,
+ final RegionServerServices services, PairOfSameType<Region> regions)
throws IOException {
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
@@ -871,7 +848,7 @@ public class IndexSplitTransaction extends SplitTransaction {
this.parent.initialize();
} catch (IOException e) {
LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
- this.parent.getRegionNameAsString(), e);
+ this.parent.getRegionInfo().getRegionNameAsString(), e);
throw new RuntimeException(e);
}
break;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
index add9b72..e361343 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
@@ -38,12 +38,12 @@ public class LocalIndexMerger extends BaseRegionServerObserver {
private static final Log LOG = LogFactory.getLog(LocalIndexMerger.class);
- private RegionMergeTransaction rmt = null;
- private HRegion mergedRegion = null;
+ private RegionMergeTransactionImpl rmt = null; // FIXME: Use of private type
+ private HRegion mergedRegion = null; // FIXME: Use of private type
@Override
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
- HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException {
+ Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException {
HTableDescriptor tableDesc = regionA.getTableDesc();
if (SchemaUtil.isSystemTable(tableDesc.getName())) {
return;
@@ -56,14 +56,14 @@ public class LocalIndexMerger extends BaseRegionServerObserver {
TableName indexTable =
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName()));
if (!MetaTableAccessor.tableExists(rs.getConnection(), indexTable)) return;
- HRegion indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment());
+ Region indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment());
if (indexRegionA == null) {
LOG.warn("Index region corresponindg to data region " + regionA
+ " not in the same server. So skipping the merge.");
ctx.bypass();
return;
}
- HRegion indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment());
+ Region indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment());
if (indexRegionB == null) {
LOG.warn("Index region corresponindg to region " + regionB
+ " not in the same server. So skipping the merge.");
@@ -71,7 +71,7 @@ public class LocalIndexMerger extends BaseRegionServerObserver {
return;
}
try {
- rmt = new RegionMergeTransaction(indexRegionA, indexRegionB, false);
+ rmt = new RegionMergeTransactionImpl(indexRegionA, indexRegionB, false);
if (!rmt.prepare(rss)) {
LOG.error("Prepare for the index regions merge [" + indexRegionA + ","
+ indexRegionB + "] failed. So returning null. ");
@@ -97,7 +97,7 @@ public class LocalIndexMerger extends BaseRegionServerObserver {
@Override
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
- HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException {
+ Region regionA, Region regionB, Region mergedRegion) throws IOException {
if (rmt != null && this.mergedRegion != null) {
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
@@ -107,7 +107,7 @@ public class LocalIndexMerger extends BaseRegionServerObserver {
@Override
public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
- HRegion regionA, HRegion regionB) throws IOException {
+ Region regionA, Region regionB) throws IOException {
HRegionServer rs = (HRegionServer) ctx.getEnvironment().getRegionServerServices();
try {
if (rmt != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
index 9af8251..7882e25 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -52,8 +52,8 @@ public class LocalIndexSplitter extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(LocalIndexSplitter.class);
- private SplitTransaction st = null;
- private PairOfSameType<HRegion> daughterRegions = null;
+ private SplitTransactionImpl st = null; // FIXME: Uses private type
+ private PairOfSameType<Region> daughterRegions = null;
private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
private static final int SPLIT_TXN_MINIMUM_SUPPORTED_VERSION = VersionUtil
.encodeVersion("0.98.9");
@@ -74,17 +74,18 @@ public class LocalIndexSplitter extends BaseRegionObserver {
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName()));
if (!MetaTableAccessor.tableExists(rss.getConnection(), indexTable)) return;
- HRegion indexRegion = IndexUtil.getIndexRegion(environment);
+ Region indexRegion = IndexUtil.getIndexRegion(environment);
if (indexRegion == null) {
LOG.warn("Index region corresponindg to data region " + environment.getRegion()
+ " not in the same server. So skipping the split.");
ctx.bypass();
return;
}
+ // FIXME: Uses private type
try {
int encodedVersion = VersionUtil.encodeVersion(environment.getHBaseVersion());
if(encodedVersion >= SPLIT_TXN_MINIMUM_SUPPORTED_VERSION) {
- st = new SplitTransaction(indexRegion, splitKey);
+ st = new SplitTransactionImpl(indexRegion, splitKey);
st.useZKForAssignment =
environment.getConfiguration().getBoolean("hbase.assignment.usezk",
true);
@@ -98,7 +99,7 @@ public class LocalIndexSplitter extends BaseRegionObserver {
ctx.bypass();
return;
}
- indexRegion.forceSplit(splitKey);
+ ((HRegion)indexRegion).forceSplit(splitKey);
daughterRegions = st.stepsBeforePONR(rss, rss, false);
HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo());
copyOfParent.setOffline(true);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fc74968..d9e64e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
@@ -114,12 +114,12 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
- private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException {
+ private static void throwIfScanOutOfRegion(Scan scan, Region region) throws DoNotRetryIOException {
boolean isLocalIndex = ScanUtil.isLocalIndex(scan);
byte[] lowerInclusiveScanKey = scan.getStartRow();
byte[] upperExclusiveScanKey = scan.getStopRow();
- byte[] lowerInclusiveRegionKey = region.getStartKey();
- byte[] upperExclusiveRegionKey = region.getEndKey();
+ byte[] lowerInclusiveRegionKey = region.getRegionInfo().getStartKey();
+ byte[] upperExclusiveRegionKey = region.getRegionInfo().getEndKey();
boolean isStaleRegionBoundaries;
if (isLocalIndex) {
byte[] expectedUpperRegionKey = scan.getAttribute(EXPECTED_UPPER_REGION_KEY);
@@ -201,7 +201,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
}
} catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
return null; // impossible
}
}
@@ -221,7 +221,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner s, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
- final HRegion dataRegion, final IndexMaintainer indexMaintainer,
+ final Region dataRegion, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final TupleProjector projector,
final ImmutableBytesWritable ptr) {
return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
@@ -246,7 +246,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
final Expression[] arrayFuncRefs, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
- final HRegion dataRegion, final IndexMaintainer indexMaintainer,
+ final Region dataRegion, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final KeyValueSchema kvSchema,
final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
final ImmutableBytesWritable ptr) {
@@ -257,7 +257,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
try {
return s.next(results);
} catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
}
}
@@ -267,7 +267,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
try {
return s.next(result, scannerContext);
} catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
}
}
@@ -319,7 +319,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
// There is a scanattribute set to retrieve the specific array element
return next;
} catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
}
}
@@ -346,10 +346,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
// There is a scanattribute set to retrieve the specific array element
return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
- }
+ }
}
private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 19a1663..d613688 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
@@ -112,8 +112,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
- HRegion region = c.getEnvironment().getRegion();
- offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
+ Region region = c.getEnvironment().getRegion();
+ offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
+ region.getRegionInfo().getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
@@ -128,7 +129,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
TupleProjector tupleProjector = null;
- HRegion dataRegion = null;
+ Region dataRegion = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
@@ -415,7 +416,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
}
- HRegion region = c.getEnvironment().getRegion();
+ Region region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (scanner) {
@@ -495,7 +496,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// If we're calculating no aggregate functions, we can exit at the
// start of a new row. Otherwise, we have to wait until an agg
int countOffset = rowAggregators.length == 0 ? 1 : 0;
- HRegion region = c.getEnvironment().getRegion();
+ Region region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (scanner) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index e613007..39a4956 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -99,8 +99,8 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -404,7 +404,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
try {
// TODO: check that key is within region.getStartKey() and region.getEndKey()
// and return special code to force client to lookup region from meta.
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
@@ -434,7 +434,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+ private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
long clientTimeStamp) throws IOException, SQLException {
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
@@ -464,7 +464,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private List<PFunction> buildFunctions(List<byte[]> keys, HRegion region,
+ private List<PFunction> buildFunctions(List<byte[]> keys, Region region,
long clientTimeStamp) throws IOException, SQLException {
List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
for (byte[] key : keys) {
@@ -914,7 +914,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp);
}
- private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+ private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
@@ -942,7 +942,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
- private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+ private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, Region region,
long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
@@ -989,7 +989,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
throws IOException, SQLException {
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
@@ -1008,7 +1008,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key,
ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
throws IOException, SQLException {
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
@@ -1051,7 +1051,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
schemaName, tableName);
byte[] parentKey = parentTableName == null ? null : lockKey;
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
@@ -1115,7 +1115,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return;
}
}
- // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
+ // TODO: Switch this to Region#batchMutate when we want to support indexes on the
// system
// table. Basically, we get all the locks that we don't already hold for all the
// tableMetadata rows. This ensures we don't have deadlock situations (ensuring
@@ -1125,7 +1125,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// on the system table. This is an issue because of the way we manage batch mutation
// in the
// Indexer.
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
// Invalidate the cache - the next getTable call will add it
// TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
@@ -1151,9 +1152,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
- private static void acquireLock(HRegion region, byte[] key, List<RowLock> locks)
+ private static void acquireLock(Region region, byte[] key, List<RowLock> locks)
throws IOException {
- RowLock rowLock = region.getRowLock(key);
+ RowLock rowLock = region.getRowLock(key, true);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
@@ -1167,7 +1168,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
* TODO: should we pass a timestamp here?
*/
@SuppressWarnings("deprecation")
- private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table) throws IOException {
+ private TableViewFinderResult findChildViews(Region region, byte[] tenantId, PTable table) throws IOException {
byte[] schemaName = table.getSchemaName().getBytes();
byte[] tableName = table.getTableName().getBytes();
boolean isMultiTenant = table.isMultiTenant();
@@ -1256,7 +1257,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
schemaName, tableName);
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
@@ -1280,7 +1281,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
// Commit the list of deletion.
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
for (ImmutableBytesPtr ckey : invalidateList) {
metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
@@ -1309,7 +1311,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -1435,7 +1437,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static interface ColumnMutator {
MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
- List<Mutation> tableMetadata, HRegion region,
+ List<Mutation> tableMetadata, Region region,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException,
SQLException;
}
@@ -1449,7 +1451,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
try {
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
@@ -1535,7 +1537,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return result;
}
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
// Invalidate from cache
for (ImmutableBytesPtr invalidateKey : invalidateList) {
metaDataCache.invalidate(invalidateKey);
@@ -1563,7 +1566,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
@Override
public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
- List<Mutation> tableMetaData, HRegion region,
+ List<Mutation> tableMetaData, Region region,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) {
byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
@@ -1647,14 +1650,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// get the co-processor environment
// TODO: check that key is within region.getStartKey() and region.getEndKey()
// and return special code to force client to lookup region from meta.
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
/*
* Lock directly on key, though it may be an index table. This will just prevent a table
* from getting rebuilt too often.
*/
final boolean wasLocked = (rowLock != null);
if (!wasLocked) {
- rowLock = region.getRowLock(key);
+ rowLock = region.getRowLock(key, true);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
@@ -1689,7 +1692,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException {
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
Collections.sort(keys, new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
@@ -1700,11 +1703,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
* Lock directly on key, though it may be an index table. This will just prevent a table
* from getting rebuilt too often.
*/
- List<RowLock> rowLocks = new ArrayList<HRegion.RowLock>(keys.size());;
+ List<RowLock> rowLocks = new ArrayList<Region.RowLock>(keys.size());;
try {
- rowLocks = new ArrayList<HRegion.RowLock>(keys.size());
+ rowLocks = new ArrayList<Region.RowLock>(keys.size());
for (int i = 0; i < keys.size(); i++) {
- HRegion.RowLock rowLock = region.getRowLock(keys.get(i));
+ Region.RowLock rowLock = region.getRowLock(keys.get(i), true);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on "
+ Bytes.toStringBinary(keys.get(i)));
@@ -1737,7 +1740,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if(functionsAvailable.size() == numFunctions) return functionsAvailable;
return null;
} finally {
- for (HRegion.RowLock lock : rowLocks) {
+ for (Region.RowLock lock : rowLocks) {
lock.release();
}
rowLocks.clear();
@@ -1756,7 +1759,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
@Override
public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
- List<Mutation> tableMetaData, HRegion region,
+ List<Mutation> tableMetaData, Region region,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks)
throws IOException, SQLException {
byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
@@ -1904,7 +1907,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
@@ -1928,7 +1931,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
PIndexState newState =
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
- RowLock rowLock = region.getRowLock(key);
+ RowLock rowLock = region.getRowLock(key, true);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
@@ -2019,7 +2022,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
tableMetadata.add(p);
}
- region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+ region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
// Invalidate from cache
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
@@ -2044,9 +2048,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) {
- byte[] startKey = region.getStartKey();
- byte[] endKey = region.getEndKey();
+ private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, Region region) {
+ byte[] startKey = region.getRegionInfo().getStartKey();
+ byte[] endKey = region.getRegionInfo().getEndKey();
if (Bytes.compareTo(startKey, key) <= 0
&& (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
endKey) < 0)) {
@@ -2056,9 +2060,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
EnvironmentEdgeManager.currentTimeMillis(), null);
}
- private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, HRegion region) {
- byte[] startKey = region.getStartKey();
- byte[] endKey = region.getEndKey();
+ private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, Region region) {
+ byte[] startKey = region.getRegionInfo().getStartKey();
+ byte[] endKey = region.getRegionInfo().getEndKey();
if (Bytes.compareTo(startKey, key) <= 0
&& (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
endKey) < 0)) {
@@ -2135,7 +2139,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] tenantId = request.getTenantId().toByteArray();
List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount());
try {
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
List<ByteString> functionNamesList = request.getFunctionNamesList();
List<Long> functionTimestampsList = request.getFunctionTimestampsList();
List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount());
@@ -2189,7 +2193,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
@@ -2225,7 +2229,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
// Don't store function info for temporary functions.
if(!temporaryFunction) {
- region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet());
+ region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
// Invalidate the cache - the next getFunction call will add it
@@ -2259,7 +2263,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
@@ -2278,7 +2282,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(MetaDataMutationResult.toProto(result));
return;
}
- region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet());
+ region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData);
@@ -2322,7 +2326,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
EnvironmentEdgeManager.currentTimeMillis(), null);
}
invalidateList.add(new FunctionBytesPtr(keys.get(0)));
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
List<Cell> results = Lists.newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 77e124d..54c688a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
@@ -176,8 +176,9 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
- HRegion region = c.getEnvironment().getRegion();
- offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
+ Region region = c.getEnvironment().getRegion();
+ offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
+ region.getRegionInfo().getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
@@ -187,7 +188,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(
scan, innerScanner, arrayKVRefs);
TupleProjector tupleProjector = null;
- HRegion dataRegion = null;
+ Region dataRegion = null;
IndexMaintainer indexMaintainer = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
@@ -231,7 +232,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId);
long estSize = iterator.getEstimatedByteSize();
final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
- final HRegion region = c.getEnvironment().getRegion();
+ final Region region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
// Once we return from the first call to next, we've run through and cached
@@ -241,7 +242,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
long actualSize = iterator.getByteSize();
chunk.resize(actualSize);
} catch (Throwable t) {
- ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
return null;
} finally {
region.closeRegionOperation();
@@ -273,7 +274,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
tuple = iterator.next();
return !isFilterDone();
} catch (Throwable t) {
- ServerUtil.throwIOException(region.getRegionNameAsString(), t);
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
return false;
}
}
@@ -288,7 +289,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
iterator.close();
}
} catch (SQLException e) {
- ServerUtil.throwIOException(region.getRegionNameAsString(), e);
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
} finally {
chunk.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 7953933..9b5f040 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -88,9 +88,9 @@ public class SequenceRegionObserver extends BaseRegionObserver {
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf)));
}
- private static void acquireLock(HRegion region, byte[] key, List<RowLock> locks)
+ private static void acquireLock(Region region, byte[] key, List<RowLock> locks)
throws IOException {
- RowLock rowLock = region.getRowLock(key);
+ RowLock rowLock = region.getRowLock(key, true);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
@@ -114,7 +114,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
// We need to set this to prevent region.increment from being called
e.bypass();
e.complete();
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
byte[] row = increment.getRow();
List<RowLock> locks = Lists.newArrayList();
TimeRange tr = increment.getTimeRange();
@@ -251,7 +251,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
}
// update the KeyValues on the server
Mutation[] mutations = new Mutation[]{put};
- region.batchMutate(mutations);
+ region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
// return a Result with the updated KeyValues
return Result.create(cells);
} finally {
@@ -345,7 +345,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
// We need to set this to prevent region.append from being called
e.bypass();
e.complete();
- HRegion region = env.getRegion();
+ Region region = env.getRegion();
byte[] row = append.getRow();
List<RowLock> locks = Lists.newArrayList();
region.startRegionOperation();
@@ -400,7 +400,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
}
}
Mutation[] mutations = new Mutation[]{m};
- region.batchMutate(mutations);
+ region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
long serverTimestamp = MetaDataUtil.getClientTimeStamp(m);
// Return result with single KeyValue. The only piece of information
// the client cares about is the timestamp, which is the timestamp of
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 2d6d98a..d5cc486 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -48,8 +48,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -125,7 +125,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
}
- private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException {
+ private static void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID) throws IOException {
if (indexUUID != null) {
for (Mutation m : mutations) {
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
@@ -133,7 +133,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
Mutation[] mutationArray = new Mutation[mutations.size()];
// TODO: should we use the one that is all or none?
- region.batchMutate(mutations.toArray(mutationArray));
+ region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
}
public static void serializeIntoScan(Scan scan) {
@@ -158,7 +158,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
int offset = 0;
- HRegion region = c.getEnvironment().getRegion();
+ Region region = c.getEnvironment().getRegion();
long ts = scan.getTimeRange().getMax();
StatisticsCollector stats = null;
if(ScanUtil.isAnalyzeTable(scan)) {
@@ -172,7 +172,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
- offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
+ offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
+ region.getRegionInfo().getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
@@ -212,7 +213,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
ptr = new ImmutableBytesWritable();
}
TupleProjector tupleProjector = null;
- HRegion dataRegion = null;
+ Region dataRegion = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
boolean localIndexScan = ScanUtil.isLocalIndex(scan);
@@ -279,8 +280,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
results);
Put put = maintainer.buildUpdateMutation(kvBuilder,
valueGetter, ptr, ts,
- c.getEnvironment().getRegion().getStartKey(),
- c.getEnvironment().getRegion().getEndKey());
+ c.getEnvironment().getRegion().getRegionInfo().getStartKey(),
+ c.getEnvironment().getRegion().getRegionInfo().getEndKey());
indexMutations.add(put);
}
}
@@ -391,7 +392,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
} catch (ConstraintViolationException e) {
// Log and ignore in count
logger.error(LogUtil.addCustomAnnotations("Failed to create row in " +
- region.getRegionNameAsString() + " with values " +
+ region.getRegionInfo().getRegionNameAsString() + " with values " +
SchemaUtil.toString(values),
ScanUtil.getCustomAnnotations(scan)), e);
continue;
@@ -479,9 +480,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
private void commitIndexMutations(final ObserverContext<RegionCoprocessorEnvironment> c,
- HRegion region, List<Mutation> indexMutations) throws IOException {
+ Region region, List<Mutation> indexMutations) throws IOException {
// Get indexRegion corresponding to data region
- HRegion indexRegion = IndexUtil.getIndexRegion(c.getEnvironment());
+ Region indexRegion = IndexUtil.getIndexRegion(c.getEnvironment());
if (indexRegion != null) {
commitBatch(indexRegion, indexMutations, null);
} else {
@@ -493,7 +494,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
table = c.getEnvironment().getTable(indexTable);
table.batch(indexMutations);
} catch (InterruptedException ie) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(),
ie);
} finally {
if (table != null) table.close();
@@ -534,9 +535,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
@Override
- public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
+ public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, Region l, Region r)
throws IOException {
- HRegion region = e.getEnvironment().getRegion();
+ Region region = e.getEnvironment().getRegion();
TableName table = region.getRegionInfo().getTable();
StatisticsCollector stats = null;
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
index 71cc1d6..549fe8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -24,12 +24,11 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -60,7 +59,7 @@ public class LocalTable implements LocalHBaseState {
Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
s.setStartRow(row);
s.setStopRow(row);
- HRegion region = this.env.getRegion();
+ Region region = this.env.getRegion();
RegionScanner scanner = region.getScanner(s);
List<Cell> kvs = new ArrayList<Cell>(1);
boolean more = scanner.next(kvs);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index f72dec0..56bf637 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -21,11 +21,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
@@ -150,10 +151,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
// as well.
try {
if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
- HRegion indexRegion = IndexUtil.getIndexRegion(env);
+ Region indexRegion = IndexUtil.getIndexRegion(env);
if (indexRegion != null) {
throwFailureIfDone();
- indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
+ indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java
index 4d5f667..26da2d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
@@ -32,8 +32,8 @@ import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
public class PerRegionIndexWriteCache {
- private Map<HRegion, Multimap<HTableInterfaceReference, Mutation>> cache =
- new HashMap<HRegion, Multimap<HTableInterfaceReference, Mutation>>();
+ private Map<Region, Multimap<HTableInterfaceReference, Mutation>> cache =
+ new HashMap<Region, Multimap<HTableInterfaceReference, Mutation>>();
/**
@@ -43,7 +43,7 @@ public class PerRegionIndexWriteCache {
* @return Get the edits for the given region. Returns <tt>null</tt> if there are no pending edits
* for the region
*/
- public Multimap<HTableInterfaceReference, Mutation> getEdits(HRegion region) {
+ public Multimap<HTableInterfaceReference, Mutation> getEdits(Region region) {
return cache.remove(region);
}
@@ -52,7 +52,7 @@ public class PerRegionIndexWriteCache {
* @param table
* @param collection
*/
- public void addEdits(HRegion region, HTableInterfaceReference table,
+ public void addEdits(Region region, HTableInterfaceReference table,
Collection<Mutation> collection) {
Multimap<HTableInterfaceReference, Mutation> edits = cache.get(region);
if (edits == null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
index f36affb..189f970 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import com.google.common.collect.Multimap;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
@@ -41,7 +41,7 @@ public class StoreFailuresInCachePolicy implements IndexFailurePolicy {
private KillServerOnFailurePolicy delegate;
private PerRegionIndexWriteCache cache;
- private HRegion region;
+ private Region region;
/**
* @param failedIndexEdits cache to update when we find a failure
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index 9171b53..b1b2656 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -23,11 +23,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.phoenix.hbase.index.CapturingAbortable;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
@@ -154,10 +155,11 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
// index is pretty hacky. If we're going to keep this, we should revisit that
// as well.
if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
- HRegion indexRegion = IndexUtil.getIndexRegion(env);
+ Region indexRegion = IndexUtil.getIndexRegion(env);
if (indexRegion != null) {
throwFailureIfDone();
- indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
+ indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
return Boolean.TRUE;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index b5e6a63..7a45e21 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
@@ -73,7 +73,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
scanRanges.initializeScan(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
- HRegion region = this.env.getRegion();
+ Region region = this.env.getRegion();
RegionScanner scanner = region.getScanner(scan);
// Run through the scanner using internal nextRaw method
region.startRegionOperation();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2fe34f7/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 99e26d1..222aefb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -24,9 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
@@ -166,14 +164,14 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
Mutation mutation = null;
if (upsert) {
mutation =
- maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state
- .getCurrentTimestamp(), env.getRegion().getStartKey(), env
- .getRegion().getEndKey());
+ maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(),
+ env.getRegion().getRegionInfo().getStartKey(),
+ env.getRegion().getRegionInfo().getEndKey());
} else {
mutation =
- maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state
- .getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion()
- .getStartKey(), env.getRegion().getEndKey());
+ maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(),
+ state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(),
+ env.getRegion().getRegionInfo().getEndKey());
}
indexUpdate.setUpdate(mutation);
if (scanner != null) {
[2/4] phoenix git commit: PHOENIX-1763 Support building with
HBase-1.1.0
Posted by en...@apache.org.
PHOENIX-1763 Support building with HBase-1.1.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c7182d49
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c7182d49
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c7182d49
Branch: refs/heads/4.4-HBase-1.1
Commit: c7182d490d6d271e8411d7a0cbe59e52a0b12c22
Parents: bf01eb2
Author: Enis Soztutar <en...@apache.org>
Authored: Thu May 21 23:08:26 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri May 22 00:31:45 2015 -0700
----------------------------------------------------------------------
phoenix-core/pom.xml | 17 +++--
.../regionserver/IndexHalfStoreFileReader.java | 31 ++++++--
.../regionserver/IndexSplitTransaction.java | 39 ++++++++--
.../hbase/regionserver/LocalIndexMerger.java | 3 +-
.../cache/aggcache/SpillableGroupByCache.java | 13 +++-
.../phoenix/coprocessor/BaseRegionScanner.java | 12 +--
.../coprocessor/BaseScannerRegionObserver.java | 77 +++++++++++---------
.../coprocessor/DelegateRegionScanner.java | 23 ++++--
.../GroupedAggregateRegionObserver.java | 53 ++++++++------
.../coprocessor/HashJoinRegionScanner.java | 60 ++++++++-------
.../coprocessor/MetaDataRegionObserver.java | 23 +++---
.../phoenix/coprocessor/ScanRegionObserver.java | 11 ++-
.../UngroupedAggregateRegionObserver.java | 55 +++++++-------
.../hbase/index/covered/data/LocalTable.java | 2 +-
.../index/covered/filter/FamilyOnlyFilter.java | 6 +-
.../index/scanner/FilteredKeyValueScanner.java | 2 +-
.../phoenix/index/PhoenixIndexBuilder.java | 6 +-
.../iterate/RegionScannerResultIterator.java | 9 ++-
.../phoenix/schema/stats/StatisticsScanner.java | 10 ++-
.../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 6 +-
.../index/covered/TestLocalTableState.java | 1 -
.../covered/filter/TestFamilyOnlyFilter.java | 12 +--
.../index/write/TestWALRecoveryCaching.java | 4 +-
phoenix-flume/pom.xml | 9 ---
phoenix-pig/pom.xml | 31 +++++---
phoenix-spark/pom.xml | 7 ++
pom.xml | 41 ++++++++++-
27 files changed, 361 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 45b8d73..22e6b60 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -350,16 +350,25 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
</dependency>
<dependency>
@@ -369,18 +378,16 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
@@ -391,13 +398,11 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
index 49e2022..9befc8c 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -47,11 +47,11 @@ import org.apache.phoenix.index.IndexMaintainer;
* that sort lowest and 'top' is the second half of the file with keys that sort greater than those
* of the bottom half. The top includes the split files midkey, of the key that follows if it does
* not exist in the file.
- *
+ *
* <p>
* This type works in tandem with the {@link Reference} type. This class is used reading while
* Reference is used writing.
- *
+ *
* <p>
* This file is not splitable. Calls to {@link #midkey()} return null.
*/
@@ -64,7 +64,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
private final byte[] splitkey;
private final byte[] splitRow;
private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers;
- private final byte[][] viewConstants;
+ private final byte[][] viewConstants;
private final int offset;
private final HRegionInfo regionInfo;
private final byte[] regionStartKeyInHFile;
@@ -144,6 +144,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
final HFileScanner delegate = s;
public boolean atEnd = false;
+ @Override
public ByteBuffer getKey() {
if (atEnd) {
return null;
@@ -160,7 +161,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
// If it is top store file replace the StartKey of the Key with SplitKey
return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
}
-
+
private ByteBuffer getChangedKey(Cell kv, boolean changeBottomKeys) {
// new KeyValue(row, family, qualifier, timestamp, type, value)
byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
@@ -183,6 +184,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return keyReplacedStartKey;
}
+ @Override
public String getKeyString() {
if (atEnd) {
return null;
@@ -190,6 +192,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return Bytes.toStringBinary(getKey());
}
+ @Override
public ByteBuffer getValue() {
if (atEnd) {
return null;
@@ -197,6 +200,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return delegate.getValue();
}
+ @Override
public String getValueString() {
if (atEnd) {
return null;
@@ -204,6 +208,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return Bytes.toStringBinary(getValue());
}
+ @Override
public Cell getKeyValue() {
if (atEnd) {
return null;
@@ -227,6 +232,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return changedKv;
}
+ @Override
public boolean next() throws IOException {
if (atEnd) {
return false;
@@ -248,10 +254,12 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
}
}
+ @Override
public boolean seekBefore(byte[] key) throws IOException {
return seekBefore(key, 0, key.length);
}
+ @Override
public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
if (top) {
@@ -282,6 +290,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
}
+ @Override
public boolean seekTo() throws IOException {
boolean b = delegate.seekTo();
if (!b) {
@@ -302,10 +311,12 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
}
}
+ @Override
public int seekTo(byte[] key) throws IOException {
return seekTo(key, 0, key.length);
}
+ @Override
public int seekTo(byte[] key, int offset, int length) throws IOException {
if (top) {
if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
@@ -342,10 +353,12 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
}
+ @Override
public int reseekTo(byte[] key) throws IOException {
return reseekTo(key, 0, key.length);
}
+ @Override
public int reseekTo(byte[] key, int offset, int length) throws IOException {
if (top) {
if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
@@ -375,11 +388,13 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
return reseekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
}
+ @Override
public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
return this.delegate.getReader();
}
// TODO: Need to change as per IndexHalfStoreFileReader
+ @Override
public boolean isSeeked() {
return this.delegate.isSeeked();
}
@@ -425,13 +440,13 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
/**
* In case of top half store, the passed key will be with the start key of the daughter region.
* But in the actual HFiles, the key will be with the start key of the old parent region. In
- * order to make the real seek in the HFiles, we need to build the old key.
- *
+ * order to make the real seek in the HFiles, we need to build the old key.
+ *
* The logic here is just replace daughter region start key with parent region start key
* in the key part.
- *
+ *
* @param key
- *
+ *
*/
private KeyValue getKeyPresentInHFiles(byte[] key) {
KeyValue keyValue = new KeyValue(key);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
index 920380b..3057a14 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
@@ -165,6 +165,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* @return <code>true</code> if the region is splittable else
* <code>false</code> if it is not (e.g. its already closed, etc.).
*/
+ @Override
public boolean prepare() {
if (!this.parent.isSplittable()) return false;
// Split key can be null if this region is unsplittable; i.e. has refs.
@@ -215,6 +216,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* Call {@link #rollback(Server, RegionServerServices)}
* @return Regions created
*/
+ @Override
/* package */PairOfSameType<HRegion> createDaughters(final Server server,
final RegionServerServices services) throws IOException {
LOG.info("Starting split of region " + this.parent);
@@ -288,16 +290,19 @@ public class IndexSplitTransaction extends SplitTransaction {
if (metaEntries == null || metaEntries.isEmpty()) {
MetaTableAccessor.splitRegion(server.getConnection(), parent.getRegionInfo(),
daughterRegions.getFirst().getRegionInfo(),
- daughterRegions.getSecond().getRegionInfo(), server.getServerName());
+ daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
+ parent.getTableDesc().getRegionReplication());
} else {
offlineParentInMetaAndputMetaEntries(server.getConnection(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
- .getSecond().getRegionInfo(), server.getServerName(), metaEntries);
+ .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
+ parent.getTableDesc().getRegionReplication());
}
}
return daughterRegions;
}
+ @Override
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
final RegionServerServices services, boolean testing) throws IOException {
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
@@ -380,6 +385,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* @throws IOException If thrown, transaction failed.
* Call {@link #rollback(Server, RegionServerServices)}
*/
+ @Override
/* package */void openDaughters(final Server server,
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
@@ -565,6 +571,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* @throws IOException
* @see #rollback(Server, RegionServerServices)
*/
+ @Override
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services)
throws IOException {
@@ -575,6 +582,7 @@ public class IndexSplitTransaction extends SplitTransaction {
return stepsAfterPONR(server, services, regions);
}
+ @Override
public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
final RegionServerServices services, PairOfSameType<HRegion> regions)
throws IOException {
@@ -585,7 +593,7 @@ public class IndexSplitTransaction extends SplitTransaction {
private void offlineParentInMetaAndputMetaEntries(Connection conn,
HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
- ServerName serverName, List<Mutation> metaEntries) throws IOException {
+ ServerName serverName, List<Mutation> metaEntries, int regionReplication) throws IOException {
List<Mutation> mutations = metaEntries;
HRegionInfo copyOfParent = new HRegionInfo(parent);
copyOfParent.setOffline(true);
@@ -595,7 +603,7 @@ public class IndexSplitTransaction extends SplitTransaction {
Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
mutations.add(putParent);
-
+
//Puts for daughters
Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
@@ -604,9 +612,18 @@ public class IndexSplitTransaction extends SplitTransaction {
addLocation(putB, serverName, 1);
mutations.add(putA);
mutations.add(putB);
+
+ // Add empty locations for region replicas of daughters so that number of replicas can be
+ // cached whenever the primary region is looked up from meta
+ for (int i = 1; i < regionReplication; i++) {
+ addEmptyLocation(putA, i);
+ addEmptyLocation(putB, i);
+ }
+
MetaTableAccessor.mutateMetaTable(conn, mutations);
}
+ @Override
public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(sn.getHostAndPort()));
@@ -617,6 +634,13 @@ public class IndexSplitTransaction extends SplitTransaction {
return p;
}
+ private static Put addEmptyLocation(final Put p, int replicaId){
+ p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
+ p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), null);
+ p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
+ return p;
+ }
+
/*
* Open daughter region in its own thread.
* If we fail, abort this hosting server.
@@ -659,6 +683,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* @throws IOException
* @throws KeeperException
*/
+ @Override
void openDaughterRegion(final Server server, final HRegion daughter)
throws IOException, KeeperException {
HRegionInfo hri = daughter.getRegionInfo();
@@ -767,6 +792,7 @@ public class IndexSplitTransaction extends SplitTransaction {
this.family = family;
}
+ @Override
public Void call() throws IOException {
splitStoreFile(family, sf);
return null;
@@ -807,6 +833,7 @@ public class IndexSplitTransaction extends SplitTransaction {
* @return True if we successfully rolled back, false if we got to the point
* of no return and so now need to abort the server to minimize damage.
*/
+ @Override
@SuppressWarnings("deprecation")
public boolean rollback(final Server server, final RegionServerServices services)
throws IOException {
@@ -879,10 +906,12 @@ public class IndexSplitTransaction extends SplitTransaction {
return result;
}
+ @Override
HRegionInfo getFirstDaughter() {
return hri_a;
}
+ @Override
HRegionInfo getSecondDaughter() {
return hri_b;
}
@@ -971,7 +1000,7 @@ public class IndexSplitTransaction extends SplitTransaction {
return ZKAssign.transitionNode(zkw, parent, serverName,
beginState, endState, znodeVersion, payload);
}
-
+
public HRegion getParent() {
return this.parent;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
index f074df7..add9b72 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
@@ -81,7 +81,8 @@ public class LocalIndexMerger extends BaseRegionServerObserver {
this.mergedRegion = rmt.stepsBeforePONR(rss, rss, false);
rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(),
indexRegionA.getRegionInfo(), indexRegionB.getRegionInfo(),
- rss.getServerName(), metaEntries);
+ rss.getServerName(), metaEntries,
+ mergedRegion.getTableDesc().getRegionReplication());
} catch (Exception e) {
ctx.bypass();
LOG.warn("index regions merge failed with the exception ", e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index ce18cc2..69fc6f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -120,7 +120,7 @@ public class SpillableGroupByCache implements GroupByCache {
/**
* Instantiates a Loading LRU Cache that stores key / aggregator[] tuples used for group by queries
- *
+ *
* @param estSize
* @param estValueSize
* @param aggs
@@ -325,7 +325,7 @@ public class SpillableGroupByCache implements GroupByCache {
/**
* Closes cache and releases spill resources
- *
+ *
* @throws IOException
*/
@Override
@@ -358,7 +358,9 @@ public class SpillableGroupByCache implements GroupByCache {
@Override
public boolean next(List<Cell> results) throws IOException {
- if (!cacheIter.hasNext()) { return false; }
+ if (!cacheIter.hasNext()) {
+ return false;
+ }
Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
ImmutableBytesWritable key = ce.getKey();
Aggregator[] aggs = ce.getValue();
@@ -377,6 +379,11 @@ public class SpillableGroupByCache implements GroupByCache {
public long getMaxResultSize() {
return s.getMaxResultSize();
}
+
+ @Override
+ public int getBatch() {
+ return s.getBatch();
+ }
};
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index ff9ac76..828f776 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -22,14 +22,14 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
public abstract class BaseRegionScanner implements RegionScanner {
@Override
public boolean isFilterDone() {
- return false;
+ return false;
}
@Override
@@ -38,10 +38,10 @@ public abstract class BaseRegionScanner implements RegionScanner {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
-
+
@Override
public boolean reseek(byte[] row) throws IOException {
throw new DoNotRetryIOException("Unsupported");
@@ -58,7 +58,7 @@ public abstract class BaseRegionScanner implements RegionScanner {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return next(result, limit);
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return next(result, scannerContext);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index a2269b4..fc74968 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -60,7 +61,7 @@ import com.google.common.collect.ImmutableList;
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
-
+
public static final String AGGREGATORS = "_Aggs";
public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions";
public static final String KEY_ORDERED_GROUP_BY_EXPRESSIONS = "_OrderedGroupByExpressions";
@@ -91,7 +92,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
* are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198.
*/
- public static final String CUSTOM_ANNOTATIONS = "_Annot";
+ public static final String CUSTOM_ANNOTATIONS = "_Annot";
/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";
@@ -111,8 +112,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public String toString() {
return this.getClass().getName();
}
-
-
+
+
private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException {
boolean isLocalIndex = ScanUtil.isLocalIndex(scan);
byte[] lowerInclusiveScanKey = scan.getStartRow();
@@ -136,7 +137,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
abstract protected boolean isRegionObserverFor(Scan scan);
abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
-
+
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s) throws IOException {
@@ -153,7 +154,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
/**
* Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown,
* to prevent the coprocessor from becoming blacklisted.
- *
+ *
*/
@Override
public final RegionScanner postScannerOpen(
@@ -165,10 +166,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
boolean success =false;
// Save the current span. When done with the child span, reset the span back to
- // what it was. Otherwise, this causes the thread local storing the current span
+ // what it was. Otherwise, this causes the thread local storing the current span
// to not be reset back to null causing catastrophic infinite loops
// and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596
- // TraceScope can't be used here because closing the scope will end up calling
+ // TraceScope can't be used here because closing the scope will end up calling
// currentSpan.stop() and that should happen only when we are closing the scanner.
final Span savedSpan = Trace.currentSpan();
final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan();
@@ -226,7 +227,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr);
}
-
+
/**
* Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
* re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
@@ -246,7 +247,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final Expression[] arrayFuncRefs, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final HRegion dataRegion, final IndexMaintainer indexMaintainer,
- final byte[][] viewConstants, final KeyValueSchema kvSchema,
+ final byte[][] viewConstants, final KeyValueSchema kvSchema,
final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
final ImmutableBytesWritable ptr) {
return new RegionScanner() {
@@ -262,9 +263,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
try {
- return s.next(result, limit);
+ return s.next(result, scannerContext);
} catch (Throwable t) {
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
return false; // impossible
@@ -324,30 +325,31 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
@Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- try {
- boolean next = s.nextRaw(result, limit);
- if (result.size() == 0) {
- return next;
- }
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- }
- if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
- IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
- }
- if (projector != null) {
- Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result)));
- result.clear();
- result.add(tuple.getValue(0));
- }
- // There is a scanattribute set to retrieve the specific array element
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
+ throws IOException {
+ try {
+ boolean next = s.nextRaw(result, scannerContext);
+ if (result.size() == 0) {
return next;
- } catch (Throwable t) {
- ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
- return false; // impossible
}
+ if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
+ replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+ }
+ if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
+ IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
+ tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+ }
+ if (projector != null) {
+ Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result)));
+ result.clear();
+ result.add(tuple.getValue(0));
+ }
+ // There is a scanattribute set to retrieve the specific array element
+ return next;
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
+ return false; // impossible
+ }
}
private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
@@ -387,6 +389,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public long getMaxResultSize() {
return s.getMaxResultSize();
}
+
+ @Override
+ public int getBatch() {
+ return s.getBatch();
+ }
};
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index f88a931..43c35a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
public class DelegateRegionScanner implements RegionScanner {
@@ -56,23 +57,33 @@ public class DelegateRegionScanner implements RegionScanner {
delegate.close();
}
+ @Override
public long getMaxResultSize() {
return delegate.getMaxResultSize();
}
- public boolean next(List<Cell> arg0, int arg1) throws IOException {
- return delegate.next(arg0, arg1);
+ @Override
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return delegate.next(result, scannerContext);
}
- public boolean next(List<Cell> arg0) throws IOException {
- return delegate.next(arg0);
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ return delegate.next(result);
}
- public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException {
- return delegate.nextRaw(arg0, arg1);
+ @Override
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return delegate.nextRaw(result, scannerContext);
}
+ @Override
public boolean nextRaw(List<Cell> arg0) throws IOException {
return delegate.nextRaw(arg0);
}
+
+ @Override
+ public int getBatch() {
+ return delegate.getBatch();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 1f1ba36..19a1663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -80,7 +80,7 @@ import com.google.common.collect.Maps;
/**
* Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY clause)
- *
+ *
* @since 0.1
*/
public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
@@ -116,7 +116,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
-
+
List<Expression> expressions = deserializeGroupByExpressions(expressionBytes, 0);
ServerAggregators aggregators =
ServerAggregators.deserialize(scan
@@ -124,7 +124,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
.getEnvironment().getConfiguration());
RegionScanner innerScanner = s;
-
+
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
TupleProjector tupleProjector = null;
@@ -142,9 +142,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
innerScanner =
- getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
+ getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
- }
+ }
if (j != null) {
innerScanner =
@@ -223,13 +223,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
/**
- *
+ *
* Cache for distinct values and their aggregations which is completely
* in-memory (as opposed to spilling to disk). Used when GROUPBY_SPILLABLE_ATTRIB
* is set to false. The memory usage is tracked at a coursed grain and will
* throw and abort if too much is used.
*
- *
+ *
* @since 3.0.0
*/
private static final class InMemoryGroupByCache implements GroupByCache {
@@ -238,9 +238,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
private final ServerAggregators aggregators;
private final RegionCoprocessorEnvironment env;
private final byte[] customAnnotations;
-
+
private int estDistVals;
-
+
InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) {
int estValueSize = aggregators.getEstimatedByteSize();
long estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize);
@@ -252,7 +252,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
this.chunk = tenantCache.getMemoryManager().allocate(estSize);
this.customAnnotations = customAnnotations;
}
-
+
@Override
public void close() throws IOException {
this.chunk.close();
@@ -291,7 +291,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
chunk.resize(estSize);
final List<KeyValue> aggResults = new ArrayList<KeyValue>(aggregateMap.size());
-
+
final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter =
aggregateMap.entrySet().iterator();
while (cacheIter.hasNext()) {
@@ -333,7 +333,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
@Override
public boolean next(List<Cell> results) throws IOException {
- if (index >= aggResults.size()) return false;
+ if (index >= aggResults.size()) {
+ return false;
+ }
results.add(aggResults.get(index));
index++;
return index < aggResults.size();
@@ -343,6 +345,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
public long getMaxResultSize() {
return s.getMaxResultSize();
}
+
+ @Override
+ public int getBatch() {
+ return s.getBatch();
+ }
};
}
@@ -350,22 +357,22 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
public long size() {
return aggregateMap.size();
}
-
+
}
private static final class GroupByCacheFactory {
public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory();
-
+
private GroupByCacheFactory() {
}
-
+
GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) {
Configuration conf = env.getConfiguration();
boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
if (spillableEnabled) {
return new SpillableGroupByCache(env, tenantId, aggregators, estDistVals);
- }
-
+ }
+
return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals);
}
}
@@ -388,14 +395,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
- estDistVals = Math.max(MIN_DISTINCT_VALUES,
+ estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
- GroupByCache groupByCache =
+ GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
@@ -453,7 +460,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
* @param limit TODO
- * @throws IOException
+ * @throws IOException
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
@@ -559,11 +566,15 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
currentKey = null;
return false;
}
-
+
@Override
public long getMaxResultSize() {
return scanner.getMaxResultSize();
}
+ @Override
+ public int getBatch() {
+ return scanner.getBatch();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index cdfc771..1e34d96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.HashCache;
@@ -48,7 +49,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.TupleUtil;
public class HashJoinRegionScanner implements RegionScanner {
-
+
private final RegionScanner scanner;
private final TupleProjector projector;
private final HashJoinInfo joinInfo;
@@ -60,7 +61,7 @@ public class HashJoinRegionScanner implements RegionScanner {
private List<Tuple>[] tempTuples;
private ValueBitSet tempDestBitSet;
private ValueBitSet[] tempSrcBitSet;
-
+
@SuppressWarnings("unchecked")
public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException {
this.scanner = scanner;
@@ -92,8 +93,8 @@ public class HashJoinRegionScanner implements RegionScanner {
}
HashCache hashCache = (HashCache)cache.getServerCache(joinId);
if (hashCache == null)
- throw new DoNotRetryIOException("Could not find hash cache for joinId: "
- + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength())
+ throw new DoNotRetryIOException("Could not find hash cache for joinId: "
+ + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength())
+ ". The cache might have expired and have been removed.");
hashCaches[i] = hashCache;
tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
@@ -103,18 +104,19 @@ public class HashJoinRegionScanner implements RegionScanner {
this.projector.setValueBitSet(tempDestBitSet);
}
}
-
+
private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
if (result.isEmpty())
return;
-
+
Tuple tuple = new ResultTuple(Result.create(result));
// For backward compatibility. In new versions, HashJoinInfo.forceProjection()
// always returns true.
if (joinInfo.forceProjection()) {
tuple = projector.projectResults(tuple);
}
-
+
+ // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well.
if (hasBatchLimit)
throw new UnsupportedOperationException("Cannot support join operations in scans with limit");
@@ -157,7 +159,7 @@ public class HashJoinRegionScanner implements RegionScanner {
Tuple lhs = resultQueue.poll();
if (!earlyEvaluation) {
ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]);
- tempTuples[i] = hashCaches[i].get(key);
+ tempTuples[i] = hashCaches[i].get(key);
if (tempTuples[i] == null) {
if (type == JoinType.Inner || type == JoinType.Semi) {
continue;
@@ -171,7 +173,7 @@ public class HashJoinRegionScanner implements RegionScanner {
Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
lhs : TupleProjector.mergeProjectedValue(
(ProjectedValueTuple) lhs, schema, tempDestBitSet,
- null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
+ null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
joinInfo.getFieldPositions()[i]);
resultQueue.offer(joined);
continue;
@@ -180,7 +182,7 @@ public class HashJoinRegionScanner implements RegionScanner {
Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
lhs : TupleProjector.mergeProjectedValue(
(ProjectedValueTuple) lhs, schema, tempDestBitSet,
- t, joinInfo.getSchemas()[i], tempSrcBitSet[i],
+ t, joinInfo.getSchemas()[i], tempSrcBitSet[i],
joinInfo.getFieldPositions()[i]);
resultQueue.offer(joined);
}
@@ -211,18 +213,19 @@ public class HashJoinRegionScanner implements RegionScanner {
}
}
}
-
+
private boolean shouldAdvance() {
if (!resultQueue.isEmpty())
return false;
-
+
return hasMore;
}
-
+
private boolean nextInQueue(List<Cell> results) {
- if (resultQueue.isEmpty())
+ if (resultQueue.isEmpty()) {
return false;
-
+ }
+
Tuple tuple = resultQueue.poll();
for (int i = 0; i < tuple.size(); i++) {
results.add(tuple.getValue(i));
@@ -252,19 +255,19 @@ public class HashJoinRegionScanner implements RegionScanner {
processResults(result, false);
result.clear();
}
-
+
return nextInQueue(result);
}
@Override
- public boolean nextRaw(List<Cell> result, int limit)
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
while (shouldAdvance()) {
- hasMore = scanner.nextRaw(result, limit);
- processResults(result, true);
+ hasMore = scanner.nextRaw(result, scannerContext);
+ processResults(result, false); // TODO fix honoring the limit
result.clear();
}
-
+
return nextInQueue(result);
}
@@ -285,19 +288,19 @@ public class HashJoinRegionScanner implements RegionScanner {
processResults(result, false);
result.clear();
}
-
+
return nextInQueue(result);
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
while (shouldAdvance()) {
- hasMore = scanner.next(result, limit);
- processResults(result, true);
+ hasMore = scanner.next(result, scannerContext);
+ processResults(result, false); // TODO honoring the limit
result.clear();
}
-
- return nextInQueue(result);
+
+ return nextInQueue(result);
}
@Override
@@ -305,5 +308,10 @@ public class HashJoinRegionScanner implements RegionScanner {
return this.scanner.getMaxResultSize();
}
+ @Override
+ public int getBatch() {
+ return this.scanner.getBatch();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 6f1d5ac..c40e3cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -69,20 +69,20 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
-
+
@Override
public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
boolean abortRequested) {
executor.shutdownNow();
GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll();
}
-
+
@Override
public void start(CoprocessorEnvironment env) throws IOException {
- // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves
+ // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves
// among region servers because we relies on server time of RS which is hosting
// SYSTEM.CATALOG
- long sleepTime = env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB,
+ long sleepTime = env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB,
QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL);
try {
if(sleepTime > 0) {
@@ -91,12 +91,12 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
- enableRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
+ enableRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
- rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
+ rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
}
-
+
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
@@ -119,7 +119,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
LOG.error("BuildIndexScheduleTask cannot start!", ex);
}
}
-
+
/**
* Task runs periodically to build indexes whose INDEX_NEED_PARTIALLY_REBUILD is set true
*
@@ -133,7 +133,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
this.env = env;
}
-
+
private String getJdbcUrl() {
String zkQuorum = this.env.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
String zkClientPort = this.env.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT,
@@ -144,7 +144,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
+ PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkClientPort
+ PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkParentNode;
}
-
+
+ @Override
public void run() {
RegionScanner scanner = null;
PhoenixConnection conn = null;
@@ -199,7 +200,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
if ((dataTable == null || dataTable.length == 0)
|| (indexStat == null || indexStat.length == 0)
- || ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
+ || ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
&& (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
// index has to be either in disable or inactive state
// data table name can't be empty
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ddde407..77e124d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -199,7 +199,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
indexMaintainer = indexMaintainers.get(0);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
}
-
+
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
innerScanner =
@@ -285,12 +285,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
} finally {
try {
if(iterator != null) {
- iterator.close();
+ iterator.close();
}
} catch (SQLException e) {
ServerUtil.throwIOException(region.getRegionNameAsString(), e);
} finally {
- chunk.close();
+ chunk.close();
}
}
}
@@ -299,6 +299,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
public long getMaxResultSize() {
return s.getMaxResultSize();
}
+
+ @Override
+ public int getBatch() {
+ return s.getBatch();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index e43e5e5..2d6d98a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -101,8 +101,8 @@ import com.google.common.collect.Sets;
/**
* Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
- *
- *
+ *
+ *
* @since 0.1
*/
public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
@@ -116,7 +116,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
public static final String EMPTY_CF = "EmptyCF";
private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
private KeyValueBuilder kvBuilder;
-
+
@Override
public void start(CoprocessorEnvironment e) throws IOException {
super.start(e);
@@ -139,14 +139,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
public static void serializeIntoScan(Scan scan) {
scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
}
-
+
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
s = super.preScannerOpen(e, scan, s);
if (ScanUtil.isAnalyzeTable(scan)) {
// We are setting the start row and stop row such that it covers the entire region. As part
- // of Phonenix-1263 we are storing the guideposts against the physical table rather than
+ // of Phonenix-1263 we are storing the guideposts against the physical table rather than
// individual tenant specific tables.
scan.setStartRow(HConstants.EMPTY_START_ROW);
scan.setStopRow(HConstants.EMPTY_END_ROW);
@@ -154,7 +154,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
return s;
}
-
+
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
int offset = 0;
@@ -179,9 +179,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
-
+
RegionScanner theScanner = s;
-
+
byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
PTable projectedTable = null;
List<Expression> selectExpressions = null;
@@ -226,14 +226,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
theScanner =
- getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
+ getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
- }
-
+ }
+
if (j != null) {
theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
}
-
+
int batchSize = 0;
List<Mutation> mutations = Collections.emptyList();
boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
@@ -330,7 +330,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
column.getDataType().coerceBytes(ptr, value,
expression.getDataType(), expression.getMaxLength(),
- expression.getScale(), expression.getSortOrder(),
+ expression.getScale(), expression.getSortOrder(),
column.getMaxLength(), column.getScale(),
column.getSortOrder());
byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
@@ -418,7 +418,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
}
}
-
+
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan)));
}
@@ -438,7 +438,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
}
final KeyValue aggKeyValue = keyValue;
-
+
RegionScanner scanner = new BaseRegionScanner() {
private boolean done = !hadAny;
@@ -464,11 +464,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
results.add(aggKeyValue);
return false;
}
-
+
@Override
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
+
+ @Override
+ public int getBatch() {
+ return innerScanner.getBatch();
+ }
};
return scanner;
}
@@ -496,7 +501,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
indexMutations.clear();
}
-
+
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, InternalScanner scanner, final ScanType scanType)
@@ -505,8 +510,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
InternalScanner internalScanner = scanner;
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
- boolean useCurrentTime =
- c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ boolean useCurrentTime =
+ c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
// Provides a means of clients controlling their timestamps to not use current time
// when background tasks are updating stats. Instead we track the max timestamp of
@@ -526,8 +531,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
}
return internalScanner;
}
-
-
+
+
@Override
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
throws IOException {
@@ -535,8 +540,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
TableName table = region.getRegionInfo().getTable();
StatisticsCollector stats = null;
try {
- boolean useCurrentTime =
- e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ boolean useCurrentTime =
+ e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
// Provides a means of clients controlling their timestamps to not use current time
// when background tasks are updating stats. Instead we track the max timestamp of
@@ -544,7 +549,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp);
stats.splitStats(region, l, r);
- } catch (IOException ioe) {
+ } catch (IOException ioe) {
if(logger.isWarnEnabled()) {
logger.warn("Error while collecting stats during split for " + table,ioe);
}
@@ -559,7 +564,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
return PTableImpl.createFromProto(ptableProto);
} catch (IOException e) {
throw new RuntimeException(e);
- }
+ }
}
private static List<Expression> deserializeExpressions(byte[] b) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
index 3469042..71cc1d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -70,4 +70,4 @@ public class LocalTable implements LocalHBaseState {
scanner.close();
return r;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
index 68555ef..d39b01d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java
@@ -58,14 +58,14 @@ public class FamilyOnlyFilter extends FamilyFilter {
@Override
public ReturnCode filterKeyValue(Cell v) {
if (done) {
- return ReturnCode.SKIP;
+ return ReturnCode.NEXT_ROW;
}
ReturnCode code = super.filterKeyValue(v);
if (previousMatchFound) {
// we found a match before, and now we are skipping the key because of the family, therefore
// we are done (no more of the family).
- if (code.equals(ReturnCode.SKIP)) {
- done = true;
+ if (code.equals(ReturnCode.SKIP) || code.equals(ReturnCode.NEXT_ROW)) {
+ done = true;
}
} else {
// if we haven't seen a match before, then it doesn't matter what we see now, except to mark
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
index e225696..435a1c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -57,7 +57,7 @@ public class FilteredKeyValueScanner implements KeyValueScanner {
/**
* Same a {@link KeyValueScanner#next()} except that we filter out the next {@link KeyValue} until we find one that
* passes the filter.
- *
+ *
* @return the next {@link KeyValue} or <tt>null</tt> if no next {@link KeyValue} is present and passes all the
* filters.
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index b89c807..b5e6a63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -59,14 +59,14 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
Mutation m = miniBatchOp.getOperation(i);
keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
-
+
for(IndexMaintainer indexMaintainer: indexMaintainers) {
if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
indexTableName.set(indexMaintainer.getIndexTableName());
if (maintainers.get(indexTableName) != null) continue;
maintainers.put(indexTableName, indexMaintainer);
}
-
+
}
if (maintainers.isEmpty()) return;
Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
@@ -100,7 +100,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
private PhoenixIndexCodec getCodec() {
return (PhoenixIndexCodec)this.codec;
}
-
+
@Override
public byte[] getBatchId(Mutation m){
return this.codec.getBatchId(m);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..52fbe9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -31,15 +31,15 @@ import org.apache.phoenix.util.ServerUtil;
public class RegionScannerResultIterator extends BaseResultIterator {
private final RegionScanner scanner;
-
+
public RegionScannerResultIterator(RegionScanner scanner) {
this.scanner = scanner;
}
-
+
@Override
public Tuple next() throws SQLException {
- // XXX: No access here to the region instance to enclose this with startRegionOperation /
- // stopRegionOperation
+ // XXX: No access here to the region instance to enclose this with startRegionOperation /
+ // stopRegionOperation
synchronized (scanner) {
try {
// TODO: size
@@ -48,6 +48,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
// since this is an indication of whether or not there are more values after the
// ones returned
boolean hasMore = scanner.nextRaw(results);
+
if (!hasMore && results.isEmpty()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index de59304..0e50923 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
@@ -58,15 +59,15 @@ public class StatisticsScanner implements InternalScanner {
}
@Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- boolean ret = delegate.next(result, limit);
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ boolean ret = delegate.next(result, scannerContext);
updateStat(result);
return ret;
}
/**
* Update the current statistics based on the lastest batch of key-values from the underlying scanner
- *
+ *
* @param results
* next batch of {@link KeyValue}s
*/
@@ -122,4 +123,5 @@ public class StatisticsScanner implements InternalScanner {
}
}
}
-}
\ No newline at end of file
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 12f1863..030b114 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
+import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.junit.Test;
import org.mockito.Mockito;
@@ -86,11 +87,12 @@ public class PhoenixIndexRpcSchedulerTest {
}
private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
+ Connection connection = Mockito.mock(Connection.class);
CallRunner task = Mockito.mock(CallRunner.class);
RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
RpcServer server = new RpcServer(null, "test-rpcserver", null, isa, conf, scheduler);
RpcServer.Call call =
- server.new Call(0, null, null, header, null, null, null, null, 10, null);
+ server.new Call(0, null, null, header, null, null, connection, null, 10, null, null);
Mockito.when(task.getCall()).thenReturn(call);
scheduler.dispatch(task);
@@ -98,4 +100,4 @@ public class PhoenixIndexRpcSchedulerTest {
Mockito.verify(task).getCall();
Mockito.verifyNoMoreInteractions(task);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index 54db5d8..e996b23 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -37,7 +37,6 @@ import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
index 216f548..808e6bc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java
@@ -47,7 +47,7 @@ public class TestFamilyOnlyFilter {
kv = new KeyValue(row, fam2, qual, 10, val);
code = filter.filterKeyValue(kv);
- assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+ assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code);
}
@Test
@@ -61,7 +61,7 @@ public class TestFamilyOnlyFilter {
KeyValue kv = new KeyValue(row, fam, qual, 10, val);
ReturnCode code = filter.filterKeyValue(kv);
- assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+ assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code);
kv = new KeyValue(row, fam2, qual, 10, val);
code = filter.filterKeyValue(kv);
@@ -69,7 +69,7 @@ public class TestFamilyOnlyFilter {
kv = new KeyValue(row, fam3, qual, 10, val);
code = filter.filterKeyValue(kv);
- assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+ assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code);
}
@Test
@@ -83,7 +83,7 @@ public class TestFamilyOnlyFilter {
KeyValue kv = new KeyValue(row, fam, qual, 10, val);
ReturnCode code = filter.filterKeyValue(kv);
- assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+ assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code);
KeyValue accept = new KeyValue(row, fam2, qual, 10, val);
code = filter.filterKeyValue(accept);
@@ -91,12 +91,12 @@ public class TestFamilyOnlyFilter {
kv = new KeyValue(row, fam3, qual, 10, val);
code = filter.filterKeyValue(kv);
- assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code);
+ assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code);
// we shouldn't match the family again - everything after a switched family should be ignored
code = filter.filterKeyValue(accept);
assertEquals("Should have skipped a 'matching' family if it arrives out of order",
- ReturnCode.SKIP, code);
+ ReturnCode.NEXT_ROW, code);
// reset the filter and we should accept it again
filter.reset();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index 60c11d7..ae577bd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -317,9 +317,9 @@ public class TestWALRecoveryCaching {
}
LOG.info("Starting region server:" + server.getHostname());
- cluster.startRegionServer(server.getHostname());
+ cluster.startRegionServer(server.getHostname(), server.getPort());
- cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
+ cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), TIMEOUT);
// start a server to get back to the base number of servers
LOG.info("STarting server to replace " + server);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7182d49/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 7ed0801..b2b9a47 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -85,7 +85,6 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
@@ -98,7 +97,6 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
@@ -111,41 +109,34 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
- <version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
- <version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
- <version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>