You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/07 12:30:54 UTC
[5/8] incubator-eagle git commit: [EAGLE-520] Fix and decouple
co-processor from eagle aggreation query service
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
index 53d27de..7ef8b80 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.storage.hbase.query.coprocessor;
+import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.*;
import org.apache.eagle.log.entity.meta.EntityDefinition;
@@ -25,7 +26,7 @@ import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
import org.apache.eagle.query.aggregate.raw.RawAggregator;
import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator;
import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import org.apache.eagle.common.DateTimeUtil;
+
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@@ -49,8 +50,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-//public abstract class AbstractAggregateEndPoint extends BaseEndpointCoprocessor{
public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol implements AggregateProtocol, Coprocessor, CoprocessorService {
+ private static final Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
private RegionCoprocessorEnvironment env;
@@ -59,16 +60,19 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
return this;
}
- public AggregateProtocolEndPoint() {}
+ public AggregateProtocolEndPoint() {
+ }
- protected void checkNotNull(Object obj,String name) {
- if(obj==null) throw new NullPointerException(name+" is null");
- }
+ protected void checkNotNull(Object obj, String name) {
+ if (obj == null) {
+ throw new NullPointerException(name + " is null");
+ }
+ }
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
- this.env = (RegionCoprocessorEnvironment)env;
+ this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
@@ -79,302 +83,296 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
// do nothing
}
-// @Override
-// public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode) throws IOException {
-// if (AggregateProtocol.class.getName().equals(protocol)) {
-//// return new ProtocolSignature(AggregateProtocol.VERSION, null);
-// return new ProtocolSignature(98l, null);
-// }
-// throw new IOException("Unknown protocol: " + protocol);
-// }
-
- protected HRegion getCurrentRegion(){
- return this.env.getRegion();
- }
-
- /**
- * <pre>
- * Region-unittest,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1:\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
- * RegionID-unittest,1413960230654
- * </pre>
- */
- protected String getLogHeader(){
- HRegion region = this.getCurrentRegion();
- return LOG.isDebugEnabled() ? String.format("Region-%s",region.getRegionNameAsString()):
- String.format("Region-%s,%d",region.getTableDesc().getNameAsString(),region.getRegionId());
- }
-
- protected class InternalReadReport {
- public InternalReadReport(long counter,long startTimestamp,long stopTimestamp){
- this.counter = counter;
- this.startTimestamp = startTimestamp;
- this.stopTimestamp = stopTimestamp;
- }
- public long getCounter() {
- return counter;
- }
- public void setCounter(long counter) {
- this.counter = counter;
- }
-
- public long getStartTimestamp() {
- return startTimestamp;
- }
-
- public void setStartTimestamp(long startTimestamp) {
- this.startTimestamp = startTimestamp;
- }
-
- public long getStopTimestamp() {
- return stopTimestamp;
- }
-
- public void setStopTimestamp(long stopTimestamp) {
- this.stopTimestamp = stopTimestamp;
- }
-
- private long counter;
- private long startTimestamp;
- private long stopTimestamp;
- }
-
- /**
- * Asynchronous HBase scan read as entity
- *
- * @param scan
- * @throws java.io.IOException
- */
- protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException {
-// _init();
- long counter = 0;
- long startTimestamp = 0;
- long stopTimestamp = 0;
-
- InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
- List<Cell> results = new ArrayList<Cell>();
- try{
- boolean hasMoreRows;
- GenericMetricShadowEntity singleMetricEntity = null;
- do{
- hasMoreRows = scanner.next(results);
- Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
- if(!results.isEmpty()){
- counter ++;
- byte[] row = results.get(0).getRow();
- long timestamp = RowkeyBuilder.getTimestamp(row, ed);
-
- // Min
- if(startTimestamp == 0 || startTimestamp > timestamp ){
- startTimestamp = timestamp;
- }
-
- // Max
- if(stopTimestamp == 0 || stopTimestamp < timestamp ){
- stopTimestamp = timestamp;
- }
-
- for(Cell kv:results){
- String qualifierName = Bytes.toString(kv.getQualifier());
-// Qualifier qualifier = null;
-// if(!ed.isTag(qualifierName)){
-// qualifier = ed.getQualifierNameMap().get(qualifierName);
-// if(qualifier == null){
-// LOG.error("qualifier for " + qualifierName + " not exist");
-// throw new NullPointerException("qualifier for field "+qualifierName+" not exist");
-// }
-// }
- if(kv.getValue()!=null) kvMap.put(qualifierName ,kv.getValue());
- }
-
- // LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
-
- InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap);
- if(internalLog!=null){
- TaggedLogAPIEntity logAPIEntity = null;
- try {
- logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed);
- if(logAPIEntity instanceof GenericMetricEntity){
- if(singleMetricEntity == null) singleMetricEntity = new GenericMetricShadowEntity();
- GenericMetricEntity e = (GenericMetricEntity)logAPIEntity;
- if(e.getValue()!=null) {
- int count = e.getValue().length;
- @SuppressWarnings("unused")
- Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
- for (int i = 0; i < count; i++) {
- long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
- // exclude those entity which is not within the time range in search condition. [start, end)
- singleMetricEntity.setTimestamp(ts);
- singleMetricEntity.setTags(e.getTags());
- singleMetricEntity.setValue(e.getValue()[i]);
- // Min
- if (startTimestamp == 0 || startTimestamp > ts) startTimestamp = ts;
- // Max
- if (stopTimestamp == 0 || stopTimestamp < ts) stopTimestamp = ts;
- listener.entityCreated(singleMetricEntity);
- }
- }
- }else {
- // LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey());
- listener.entityCreated(logAPIEntity);
- }
- } catch (Exception e) {
- if(internalLog!=null) {
- LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e);
- }
- throw new IOException(e);
- }
- }else{
- LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap);
- }
- results.clear();
- }else{
- if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue");
- }
- } while(hasMoreRows);
- }catch(IOException ex){
- LOG.error(ex.getMessage(),ex);
- throw ex;
- } finally {
- if(scanner != null) {
+ protected HRegion getCurrentRegion() {
+ return this.env.getRegion();
+ }
+
+ /**
+ * <pre>
+ * Region-unittest
+ * ,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1
+ * :\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
+ *
+ * RegionID-unittest,1413960230654
+ * </pre>
+ */
+ protected String getLogHeader() {
+ HRegion region = this.getCurrentRegion();
+ return LOG.isDebugEnabled() ? String.format("Region-%s", region.getRegionNameAsString()) :
+ String.format("Region-%s,%d", region.getTableDesc().getNameAsString(), region.getRegionId());
+ }
+
+ protected class InternalReadReport {
+ public InternalReadReport(long counter, long startTimestamp, long stopTimestamp) {
+ this.counter = counter;
+ this.startTimestamp = startTimestamp;
+ this.stopTimestamp = stopTimestamp;
+ }
+
+ public long getCounter() {
+ return counter;
+ }
+
+ public void setCounter(long counter) {
+ this.counter = counter;
+ }
+
+ public long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ public void setStartTimestamp(long startTimestamp) {
+ this.startTimestamp = startTimestamp;
+ }
+
+ public long getStopTimestamp() {
+ return stopTimestamp;
+ }
+
+ public void setStopTimestamp(long stopTimestamp) {
+ this.stopTimestamp = stopTimestamp;
+ }
+
+ private long counter;
+ private long startTimestamp;
+ private long stopTimestamp;
+ }
+
+ /**
+ * Asynchronous HBase scan read as entity.
+ */
+ protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException {
+ long counter = 0;
+ long startTimestamp = 0;
+ long stopTimestamp = 0;
+
+ InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
+ List<Cell> results = new ArrayList<Cell>();
+ try {
+ boolean hasMoreRows;
+ GenericMetricShadowEntity singleMetricEntity = null;
+ do {
+ hasMoreRows = scanner.next(results);
+ Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
+ if (!results.isEmpty()) {
+ counter++;
+ byte[] row = results.get(0).getRow();
+ long timestamp = RowkeyBuilder.getTimestamp(row, ed);
+
+ // Min
+ if (startTimestamp == 0 || startTimestamp > timestamp) {
+ startTimestamp = timestamp;
+ }
+
+ // Max
+ if (stopTimestamp == 0 || stopTimestamp < timestamp) {
+ stopTimestamp = timestamp;
+ }
+
+ for (Cell kv : results) {
+ String qualifierName = Bytes.toString(kv.getQualifier());
+ // Qualifier qualifier = null;
+ // if(!ed.isTag(qualifierName)){
+ // qualifier = ed.getQualifierNameMap().get(qualifierName);
+ // if(qualifier == null){
+ // LOG.error("qualifier for " + qualifierName + " not exist");
+ // throw new NullPointerException("qualifier for field "+qualifierName+" not exist");
+ // }
+ // }
+ if (kv.getValue() != null) {
+ kvMap.put(qualifierName, kv.getValue());
+ }
+ }
+
+ // LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
+
+ InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap);
+ if (internalLog != null) {
+ TaggedLogAPIEntity logAPIEntity = null;
+ try {
+ logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed);
+ if (logAPIEntity instanceof GenericMetricEntity) {
+ if (singleMetricEntity == null) {
+ singleMetricEntity = new GenericMetricShadowEntity();
+ }
+ GenericMetricEntity e = (GenericMetricEntity) logAPIEntity;
+ if (e.getValue() != null) {
+ int count = e.getValue().length;
+ @SuppressWarnings("unused")
+ Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
+ for (int i = 0; i < count; i++) {
+ long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
+ // exclude those entity which is not within the time range in search condition. [start, end)
+ singleMetricEntity.setTimestamp(ts);
+ singleMetricEntity.setTags(e.getTags());
+ singleMetricEntity.setValue(e.getValue()[i]);
+ // Min
+ if (startTimestamp == 0 || startTimestamp > ts) {
+ startTimestamp = ts;
+ }
+ // Max
+ if (stopTimestamp == 0 || stopTimestamp < ts) {
+ stopTimestamp = ts;
+ }
+ listener.entityCreated(singleMetricEntity);
+ }
+ }
+ } else {
+ // LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey());
+ listener.entityCreated(logAPIEntity);
+ }
+ } catch (Exception e) {
+ if (internalLog != null) {
+ LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e);
+ }
+ throw new IOException(e);
+ }
+ } else {
+ LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap);
+ }
+ results.clear();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("Empty batch of KeyValue");
+ }
+ }
+ }
+ while (hasMoreRows);
+ } catch (IOException ex) {
+ LOG.error(ex.getMessage(), ex);
+ throw ex;
+ } finally {
+ if (scanner != null) {
scanner.close();
}
- }
- return new InternalReadReport(counter,startTimestamp,stopTimestamp);
- }
-
- /**
- * Asynchronous HBase scan read as RAW qualifier
- *
- * @param scan
- * @param listener
- * @throws Exception
- */
- protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException {
-// _init();
- long counter = 0;
- long startTimestamp = 0;
- long stopTimestamp = 0;
- InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
- List<Cell> results = new ArrayList<Cell>();
- try{
- boolean hasMoreRows;//false by default
- do{
- hasMoreRows = scanner.next(results);
- Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
- if(!results.isEmpty()){
- counter ++;
- byte[] row = results.get(0).getRow();
-// if(ed.isTimeSeries()){
- long timestamp = RowkeyBuilder.getTimestamp(row,ed);
- // Min
- if(startTimestamp == 0 || startTimestamp > timestamp ){
- startTimestamp = timestamp;
- }
- // Max
- if(stopTimestamp == 0 || stopTimestamp < timestamp ){
- stopTimestamp = timestamp;
- }
-// }
-
- for(Cell kv:results){
- String qualifierName = Bytes.toString(kv.getQualifier());
- Qualifier qualifier = null;
- if(!ed.isTag(qualifierName)){
- qualifier = ed.getQualifierNameMap().get(qualifierName);
- if(qualifier == null){
- LOG.error("qualifier for field " + qualifierName + " not exist");
- throw new IOException(new NullPointerException("qualifier for field "+qualifierName+" is null"));
- }
- qualifierName = qualifier.getDisplayName();
- }
- if(kv.getValue()!=null) kvMap.put(qualifierName,kv.getValue());
- }
-
-// LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
-
- if(!kvMap.isEmpty()) listener.qualifierCreated(kvMap);
- results.clear();
- }else{
- if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue");
- }
- } while(hasMoreRows);
- } catch(IOException ex){
- LOG.error(ex.getMessage(),ex);
- throw ex;
- } finally {
- if(scanner != null) {
+ }
+ return new InternalReadReport(counter, startTimestamp, stopTimestamp);
+ }
+
+ /**
+ * Asynchronous HBase scan read as RAW qualifier.
+ */
+ protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException {
+ long counter = 0;
+ long startTimestamp = 0;
+ long stopTimestamp = 0;
+ InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
+ List<Cell> results = new ArrayList<Cell>();
+ try {
+ boolean hasMoreRows;//false by default
+ do {
+ hasMoreRows = scanner.next(results);
+ Map<String, byte[]> kvMap = new HashMap<>();
+ if (!results.isEmpty()) {
+ counter++;
+ byte[] row = results.get(0).getRow();
+ long timestamp = RowkeyBuilder.getTimestamp(row, ed);
+ // Min
+ if (startTimestamp == 0 || startTimestamp > timestamp) {
+ startTimestamp = timestamp;
+ }
+ // Max
+ if (stopTimestamp == 0 || stopTimestamp < timestamp) {
+ stopTimestamp = timestamp;
+ }
+
+ for (Cell kv : results) {
+ String qualifierName = Bytes.toString(kv.getQualifier());
+ Qualifier qualifier = null;
+ if (!ed.isTag(qualifierName)) {
+ qualifier = ed.getQualifierNameMap().get(qualifierName);
+ if (qualifier == null) {
+ LOG.error("qualifier for field " + qualifierName + " not exist");
+ throw new IOException(new NullPointerException("qualifier for field " + qualifierName + " is null"));
+ }
+ qualifierName = qualifier.getDisplayName();
+ }
+ if (kv.getValue() != null) {
+ kvMap.put(qualifierName, kv.getValue());
+ }
+ }
+
+ if (!kvMap.isEmpty()) {
+ listener.qualifierCreated(kvMap);
+ }
+ results.clear();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("Empty batch of KeyValue");
+ }
+ }
+ }
+ while (hasMoreRows);
+ } catch (IOException ex) {
+ LOG.error(ex.getMessage(), ex);
+ throw ex;
+ } finally {
+ if (scanner != null) {
scanner.close();
}
- }
+ }
- return new InternalReadReport(counter,startTimestamp,stopTimestamp);
- }
+ return new InternalReadReport(counter, startTimestamp, stopTimestamp);
+ }
@Override
- public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
+ public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
AggregateResult result = null;
try {
result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()),
- ProtoBufConverter.fromPBScan(request.getScan()),
- ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
- ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
- ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList())
+ ProtoBufConverter.fromPBScan(request.getScan()),
+ ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
+ ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
+ ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()),
+ request.getStartTime(),
+ request.getEndTime(),
+ request.getIntervalMin()
);
} catch (IOException e) {
+ LOG.error("Failed to convert result to PB-based message", e);
ResponseConverter.setControllerException(controller, e);
}
try {
done.run(ProtoBufConverter.toPBAggregateResult(result));
} catch (IOException e) {
- throw new RuntimeException("Failed to convert result to PB-based message",e);
+ LOG.error("Failed to convert result to PB-based message", e);
+ ResponseConverter.setControllerException(controller, e);
}
}
@Override
- public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
+ public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
AggregateResult result = null;
try {
result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()),
- ProtoBufConverter.fromPBScan(request.getScan()),
- ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
- ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
- ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()),
- request.getStartTime(),
- request.getEndTime(),
- request.getIntervalMin()
+ ProtoBufConverter.fromPBScan(request.getScan()),
+ ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
+ ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
+ ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList())
);
} catch (IOException e) {
- LOG.error("Failed to convert result to PB-based message",e);
ResponseConverter.setControllerException(controller, e);
}
try {
done.run(ProtoBufConverter.toPBAggregateResult(result));
} catch (IOException e) {
- LOG.error("Failed to convert result to PB-based message",e);
- ResponseConverter.setControllerException(controller, e);
+ throw new RuntimeException("Failed to convert result to PB-based message", e);
}
}
- private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
- /**
- *
- * @param entityDefinition
- * @param scan
- * @param groupbyFields
- * @param aggregateFuncTypes
- * @param aggregatedFields
- * @return
- * @throws Exception
- */
@Override
- public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-// LOG.info("Using coprocessor instance: "+this);
+ public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+ List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
checkNotNull(entityDefinition, "entityDefinition");
String serviceName = entityDefinition.getService();
- LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
- if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
- long _start = System.currentTimeMillis();
- final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition);
+ LOG.info(this.getLogHeader() + " raw group aggregate on service: " + serviceName
+ + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SCAN: " + scan.toJSON());
+ }
+ final long startTimestamp = System.currentTimeMillis();
+ final RawAggregator aggregator = new RawAggregator(groupbyFields,
+ AggregateFunctionType.fromBytesList(aggregateFuncTypes), aggregatedFields, entityDefinition);
InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
@@ -384,34 +382,31 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
result.setStopTimestamp(report.getStopTimestamp());
long _stop = System.currentTimeMillis();
- LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
+ LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms",
+ this.getLogHeader(), report.getCounter(), keyValues.size(), report.getStartTimestamp(),
+ report.getStopTimestamp(), (_stop - startTimestamp)));
return result;
}
- /**
- * TODO: refactor time series aggregator to remove dependency of business logic entity class
- *
- * @param entityDefinition
- * @param scan
- * @param groupbyFields
- * @param aggregateFuncTypes
- * @param aggregatedFields
- * @param intervalMin
- * @return
- * @throws Exception
- */
@Override
- public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException {
-// LOG.info("Using coprocessor instance: "+this);
+ public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+ List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException {
checkNotNull(entityDefinition, "entityDefinition");
String serviceName = entityDefinition.getService();
- LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin +
- " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
- if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
- long _start = System.currentTimeMillis();
- final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin);
- InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator);
+ LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName
+ + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes)
+ + " fields: " + aggregatedFields + " intervalMin: " + intervalMin
+ + " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime)
+ + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SCAN: " + scan.toJSON());
+ }
+
+ final long startTimestamp = System.currentTimeMillis();
+ final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,
+ AggregateFunctionType.fromBytesList(aggregateFuncTypes), aggregatedFields, startTime, endTime, intervalMin);
+ InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
AggregateResult result = new AggregateResult();
@@ -420,28 +415,9 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
result.setStopTimestamp(report.getStopTimestamp());
long _stop = System.currentTimeMillis();
- LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
+ LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms",
+ this.getLogHeader(), report.getCounter(), keyValues.size(), report.getStartTimestamp(), report.getStopTimestamp(), (_stop - startTimestamp)));
return result;
}
-
-// /**
-// * Initialization per aggregate RPC call
-// */
-// private void _init(){
-// this.startTimestamp = 0;
-// this.stopTimestamp = 0;
-// }
-//
-// // Min
-// private long startTimestamp;
-// // Max
-// private long stopTimestamp;
-//
-// public long getStartTimestamp() {
-// return startTimestamp;
-// }
-// public long getStopTimestamp() {
-// return stopTimestamp;
-// }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
index 84380eb..a49ad57 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
@@ -31,8 +31,7 @@ import java.io.Serializable;
import java.util.List;
/**
- * Aggregated writable result consist of group-by key-values list and additional meta information
- *
+ * Aggregated writable result consist of group-by key-values list and additional meta information.
* <h2>Schema</h2>
* <pre>
* {
@@ -42,88 +41,88 @@ import java.util.List;
* }
* </pre>
*/
-public class AggregateResult implements Writable,Serializable{
-
- private final static Logger LOG = LoggerFactory.getLogger(AggregateResult.class);
-
- /**
- * Automatically generated default serialVersionUID
- */
- private static final long serialVersionUID = 1L;
-
- private final WritableList<GroupbyKeyValue> keyValues;
-
- private long startTimestamp = 0;
-
- public long getStartTimestamp() {
- return startTimestamp;
- }
-
- public void setStartTimestamp(long startTimestamp) {
- this.startTimestamp = startTimestamp;
- }
-
- public long getStopTimestamp() {
- return stopTimestamp;
- }
-
- public void setStopTimestamp(long stopTimestamp) {
- this.stopTimestamp = stopTimestamp;
- }
-
- public WritableList<GroupbyKeyValue> getKeyValues() {
- return keyValues;
- }
-
- public void setKeyValues(List<GroupbyKeyValue> keyValues){
- this.keyValues.addAll(keyValues);
- }
-
- private long stopTimestamp;
-
- public AggregateResult(){
- this.keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.startTimestamp = in.readLong();
- this.stopTimestamp = in.readLong();
- keyValues.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.startTimestamp);
- out.writeLong(this.stopTimestamp);
- keyValues.write(out);
- }
-
-
- public static AggregateResult build(List<String[]> keys,List<double[]> values,List<Integer> counts,long startTimestamp,long stopTimestamp){
- if(keys.size() > values.size()){
- throw new IllegalArgumentException("keys' size: "+keys.size()+" not equal with values' size: "+values.size());
- }
- AggregateResult result = new AggregateResult();
- result.setStartTimestamp(startTimestamp);
- result.setStopTimestamp(stopTimestamp);
- WritableList<GroupbyKeyValue> keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class,keys.size());
-
- for(int i=0;i<keys.size();i++) {
- String[] key = keys.get(i);
- GroupbyKey gkey = new GroupbyKey();
- for (String k : key) {
- gkey.addValue(k.getBytes());
- }
- GroupbyValue gvalue = new GroupbyValue();
- double[] value = values.get(i);
- for(double v:value){
- gvalue.add(v);
- gvalue.addMeta(counts.get(i));
- }
- keyValues.add(new GroupbyKeyValue(gkey, gvalue));
- }
- result.setKeyValues(keyValues);
- return result;
- }
+public class AggregateResult implements Writable, Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AggregateResult.class);
+
+ /**
+ * Automatically generated default serialVersionUID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ private final WritableList<GroupbyKeyValue> keyValues;
+
+ private long startTimestamp = 0;
+
+ public long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ public void setStartTimestamp(long startTimestamp) {
+ this.startTimestamp = startTimestamp;
+ }
+
+ public long getStopTimestamp() {
+ return stopTimestamp;
+ }
+
+ public void setStopTimestamp(long stopTimestamp) {
+ this.stopTimestamp = stopTimestamp;
+ }
+
+ public WritableList<GroupbyKeyValue> getKeyValues() {
+ return keyValues;
+ }
+
+ public void setKeyValues(List<GroupbyKeyValue> keyValues) {
+ this.keyValues.addAll(keyValues);
+ }
+
+ private long stopTimestamp;
+
+ public AggregateResult() {
+ this.keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.startTimestamp = in.readLong();
+ this.stopTimestamp = in.readLong();
+ keyValues.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.startTimestamp);
+ out.writeLong(this.stopTimestamp);
+ keyValues.write(out);
+ }
+
+
+ public static AggregateResult build(List<String[]> keys, List<double[]> values, List<Integer> counts, long startTimestamp, long stopTimestamp) {
+ if (keys.size() > values.size()) {
+ throw new IllegalArgumentException("keys' size: " + keys.size() + " not equal with values' size: " + values.size());
+ }
+ AggregateResult result = new AggregateResult();
+ result.setStartTimestamp(startTimestamp);
+ result.setStopTimestamp(stopTimestamp);
+ WritableList<GroupbyKeyValue> keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class, keys.size());
+
+ for (int i = 0; i < keys.size(); i++) {
+ String[] key = keys.get(i);
+ GroupbyKey gkey = new GroupbyKey();
+ for (String k : key) {
+ gkey.addValue(k.getBytes());
+ }
+ GroupbyValue gvalue = new GroupbyValue();
+ double[] value = values.get(i);
+ for (double v : value) {
+ gvalue.add(v);
+ gvalue.addMeta(counts.get(i));
+ }
+ keyValues.add(new GroupbyKeyValue(gkey, gvalue));
+ }
+ result.setKeyValues(keyValues);
+ return result;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
index a68a592..306a6d1 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
@@ -21,28 +21,20 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
/**
* <h1>AggregateResultCallback Interface</h1>
- *
* Merge coprocessor results from different regions and generate final aggregate result
* <br/>
- *
- * @see org.apache.hadoop.hbase.client.HTableInterface
- * coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable) throws IOException, Throwable;
- *
+ * @see org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(Class, byte[], byte[], Batch.Call, Batch.Callback)
*/
-public interface AggregateResultCallback extends Batch.Callback<AggregateProtos.AggregateResult>{
- /**
- * Generate final result after callback from region servers
- *
- * @return AggregateResult
- */
+public interface AggregateResultCallback extends Batch.Callback<AggregateProtos.AggregateResult> {
+ /**
+ * Generate final result after callback from region servers.
+ *
+ * @return AggregateResult
+ */
AggregateResult result();
/**
- * Compatible for older callback interface in 0.94 or older
- *
- * @param region
- * @param row
- * @param result
+ * Compatible for older callback interface in 0.94 or older.
*/
void update(byte[] region, byte[] row, AggregateResult result);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
index 060d3ba..c3c57ed 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
@@ -32,44 +32,63 @@ import java.util.ArrayList;
import java.util.List;
/**
- * The protocol adapter for migrating from hbase-0.94 to hbase-0.96+
- *
- * @since 6/3/15
+ * The protocol adapter for <code>hbase-0.98</code> and <code>protobuffer-2.5</code>
*/
public final class ProtoBufConverter {
public static AggregateResult fromPBResult(AggregateProtos.AggregateResult pbResult) throws IOException {
- ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray());;
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray());
+ ;
AggregateResult result = new AggregateResult();
result.readFields(byteArrayDataInput);
return result;
}
- public static AggregateProtos.AggregateRequest toPBRequest(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields) throws IOException {
+ public static AggregateProtos.AggregateRequest toPBRequest(
+ EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+ List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields) throws IOException {
AggregateProtos.AggregateRequest.Builder builder = AggregateProtos.AggregateRequest.newBuilder()
.setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition)))
.setScan(toPBScan(scan));
- for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField);
- for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
- for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField);
+ for (String groupbyField : groupbyFields) {
+ builder.addGroupbyFields(groupbyField);
+ }
+
+ for (byte[] funcTypeBytes : aggregateFuncTypesBytes) {
+ builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
+ }
+
+ for (String aggField : aggregatedFields) {
+ builder.addAggregatedFields(aggField);
+ }
return builder.build();
}
public static ByteString writableToByteString(Writable writable) throws IOException {
- ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();;
+ ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
writable.write(dataOutput);
return ByteString.copyFrom(dataOutput.toByteArray());
}
- public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException {
- AggregateProtos.TimeSeriesAggregateRequest.Builder builder = AggregateProtos.TimeSeriesAggregateRequest.newBuilder()
+ public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest(
+ EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+ List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields,
+ long startTime, long endTime, long intervalMin) throws IOException {
+ AggregateProtos.TimeSeriesAggregateRequest.Builder builder =
+ AggregateProtos.TimeSeriesAggregateRequest.newBuilder()
.setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition)))
.setScan(toPBScan(scan));
- for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField);
- for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
- for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField);
+ for (String groupbyField : groupbyFields) {
+ builder.addGroupbyFields(groupbyField);
+ }
+ for (byte[] funcTypeBytes : aggregateFuncTypesBytes) {
+ builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
+ }
+ for (String aggField : aggregatedFields) {
+ builder.addAggregatedFields(aggField);
+ }
builder.setStartTime(startTime);
builder.setEndTime(endTime);
@@ -79,33 +98,36 @@ public final class ProtoBufConverter {
}
public static EntityDefinition fromPBEntityDefinition(AggregateProtos.EntityDefinition entityDefinition) throws IOException {
- ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray());;
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray());
+ ;
EntityDefinition result = new EntityDefinition();
result.readFields(byteArrayDataInput);
return result;
}
- public static List<String> fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) {
- List<String> result = new ArrayList<>(groupbyFieldsList.size());
- for(ByteString byteString:groupbyFieldsList.asByteStringList()){
- result.add(byteString.toStringUtf8());
- }
- return result;
+ // /**
+ // * For protobuffer-2.6
+ // */
+ // public static List<String> fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) {
+ // List<String> result = new ArrayList<>(groupbyFieldsList.size());
+ // for(ByteString byteString:groupbyFieldsList.asByteStringList()){
+ // result.add(byteString.toStringUtf8());
+ // }
+ // return result;
+ // }
+
+ public static List<String> fromPBStringList(List<String> groupbyFieldsList) {
+ return groupbyFieldsList;
}
public static List<byte[]> fromPBByteArrayList(List<ByteString> aggregateFuncTypesList) {
List<byte[]> bytesArrayList = new ArrayList<>(aggregateFuncTypesList.size());
- for(ByteString byteString:aggregateFuncTypesList){
+ for (ByteString byteString : aggregateFuncTypesList) {
bytesArrayList.add(byteString.toByteArray());
}
return bytesArrayList;
}
- /**
- *
- * @param scan
- * @return
- */
public static Scan fromPBScan(ClientProtos.Scan scan) throws IOException {
return ProtobufUtil.toScan(scan);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
deleted file mode 100755
index af213e9..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package eagle.storage.hbase.query.coprocessor;
-//
-//import eagle.log.entity.meta.EntityDefinition;
-//import eagle.query.aggregate.AggregateFunctionType;
-//import eagle.query.aggregate.raw.GroupbyKeyValue;
-//import eagle.query.aggregate.raw.RawAggregator;
-//import eagle.query.aggregate.timeseries.TimeSeriesAggregator;
-//import eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-//import eagle.storage.hbase.query.coprocessor.impl.AbstractAggregateEndPoint;
-//import hadoop.eagle.common.DateTimeUtil;
-//import com.google.protobuf.RpcCallback;
-//import com.google.protobuf.RpcController;
-//import org.apache.hadoop.hbase.client.Scan;
-//import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-//import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-//import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import java.io.IOException;
-//import java.util.List;
-//
-///**
-// * Coprocessor EndPoint of protocol <code>AggregateProtocol</code>
-// *
-// * <br/>
-// * <h2>Deployment:</h2>
-// *
-// * Firstly deploy jar files to cluster on local file system or HDFS.<br/><br/>
-// * Secondly configure in <code>hbase-site.xml</code> as following:
-// * <pre>
-// * <property>
-// * <name>hbase.coprocessor.region.classes</name>
-// * <value>AggregateProtocolEndPoint</value>
-// * </property>
-// * </pre>
-// * Or register on related hbase tables
-// * <pre>
-// * hbase(main):005:0> alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'
-// * </pre>
-// *
-// * <h2>Reference:</h2>
-// * <a href="https://blogs.apache.org/hbase/entry/coprocessor_introduction">
-// * Coprocessor Introduction
-// * (Authors: Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell)
-// * </a> <br/><br/>
-// *
-// * @see AggregateProtocol
-// *
-//// * @since : 10/31/14,2014
-// */
-//@SuppressWarnings("unused")
-//public class AggregateProtocolEndPoint extends AbstractAggregateEndPoint {
-// private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
-// /**
-// *
-// * @param entityDefinition
-// * @param scan
-// * @param groupbyFields
-// * @param aggregateFuncTypes
-// * @param aggregatedFields
-// * @return
-// * @throws Exception
-// */
-// @Override
-// public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-//// LOG.info("Using coprocessor instance: "+this);
-// checkNotNull(entityDefinition, "entityDefinition");
-// String serviceName = entityDefinition.getService();
-// LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
-// if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-// long _start = System.currentTimeMillis();
-// final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition);
-// InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
-//
-// List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-// AggregateResult result = new AggregateResult();
-// result.setKeyValues(keyValues);
-// result.setStartTimestamp(report.getStartTimestamp());
-// result.setStopTimestamp(report.getStopTimestamp());
-//
-// long _stop = System.currentTimeMillis();
-// LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-//
-// return result;
-// }
-//
-// /**
-// * TODO: refactor time series aggregator to remove dependency of business logic entity class
-// *
-// * @param entityDefinition
-// * @param scan
-// * @param groupbyFields
-// * @param aggregateFuncTypes
-// * @param aggregatedFields
-// * @param intervalMin
-// * @return
-// * @throws Exception
-// */
-// @Override
-// public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException {
-//// LOG.info("Using coprocessor instance: "+this);
-// checkNotNull(entityDefinition, "entityDefinition");
-// String serviceName = entityDefinition.getService();
-// LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin +
-// " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
-// if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-// long _start = System.currentTimeMillis();
-// final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin);
-// InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator);
-// List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-//
-// AggregateResult result = new AggregateResult();
-// result.setKeyValues(keyValues);
-// result.setStartTimestamp(report.getStartTimestamp());
-// result.setStopTimestamp(report.getStopTimestamp());
-//
-// long _stop = System.currentTimeMillis();
-// LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-//
-// return result;
-// }
-//}
\ No newline at end of file