You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:09 UTC
[02/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java
new file mode 100755
index 0000000..ae9ecef
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/common/SplitFullScanEntityReader.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.common;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericEntityBatchReader;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.query.ListQueryCompiler;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.EagleBase64Wrapper;
+
+/**
+ * Support stream based entity read. Internally it splits entity fetching to multiple threads to improve
+ * the performance. However, it doesn't support multi-threading for client to read entities from result set.
+ *
+ */
+public class SplitFullScanEntityReader<ENTITY extends TaggedLogAPIEntity> {
+
+ // class members
+ public static final int DEFAULT_BUFFER_SIZE = 10 * 1000;
+ public static final int MAX_WRITE_TIME_OUT_IN_SECONDS = 60;
+ private static final Logger LOG = LoggerFactory.getLogger(SplitFullScanEntityReader.class);
+ private static final TaggedLogAPIEntity COMPLETED_ENTITY = new TaggedLogAPIEntity();
+
+ // instance members
+ private final int splits;
+ private final String query;
+ private final long startTime;
+ private final long endTime;
+ private final String startRowkey;
+ private final int pageSize;
+ private final int bufferSize;
+
+ public SplitFullScanEntityReader(String query,
+ String startTime, String endTime,
+ int splits, String startRowkey, int pageSize) {
+ this(
+ query,
+ DateTimeUtil.humanDateToSecondsWithoutException(startTime) * 1000,
+ DateTimeUtil.humanDateToSecondsWithoutException(endTime) * 1000,
+ splits,
+ startRowkey,
+ pageSize
+ );
+ }
+
+ public SplitFullScanEntityReader(String query, long startTime, long endTime,
+ int splits, String startRowkey, int pageSize) {
+ this(query, startTime, endTime, splits, startRowkey, pageSize,
+ DEFAULT_BUFFER_SIZE);
+ }
+
+ public SplitFullScanEntityReader(String query, long startTime, long endTime,
+ int splits, String startRowkey, int pageSize, int bufferSize) {
+ this.query = query;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.splits = splits;
+ this.startRowkey = startRowkey;
+ this.pageSize = pageSize;
+ this.bufferSize = bufferSize;
+ }
+
+ public EntityResultSet<ENTITY> read() throws Exception {
+ final EntityResultSet<ENTITY> resultSet = new EntityResultSet<ENTITY>(new ArrayBlockingQueue<TaggedLogAPIEntity>(bufferSize));
+ final List<GenericEntityBatchReader> readers = createSplitThreads();
+
+ final int size = readers.size();
+ if (size > 0) {
+ final AtomicInteger threadCount = new AtomicInteger(size);
+ final AtomicInteger entityCount = new AtomicInteger(0);
+ for (GenericEntityBatchReader reader : readers) {
+ final EntityFetchThread<ENTITY> thread = new EntityFetchThread<ENTITY>(reader, threadCount, entityCount, resultSet);
+ thread.start();
+ }
+ } else {
+ resultSet.getQueue().add(COMPLETED_ENTITY);
+ }
+ return resultSet;
+ }
+
+ protected List<GenericEntityBatchReader> createSplitThreads() throws Exception {
+
+ final List<GenericEntityBatchReader> readers = new ArrayList<GenericEntityBatchReader>();
+ final ListQueryCompiler comp = new ListQueryCompiler(query);
+ final EntityDefinition entityDef = EntityDefinitionManager.getEntityByServiceName(comp.serviceName());
+ if (entityDef == null) {
+ throw new IllegalArgumentException("Invalid entity name: " + comp.serviceName());
+ }
+
+ // TODO: For now we don't support one query to query multiple partitions. In future
+ // if partition is defined for the entity, internally We need to spawn multiple
+ // queries and send one query for each search condition for each partition
+ final List<String[]> partitionValues = comp.getQueryPartitionValues();
+ partitionConstraintValidate(partitionValues, query);
+
+ long lastTimestamp = Long.MAX_VALUE;
+ if (startRowkey != null) {
+ final byte[] lastRowkey = EagleBase64Wrapper.decode(startRowkey);
+ lastTimestamp = RowkeyBuilder.getTimestamp(lastRowkey, entityDef);
+ }
+
+ final long duration = (endTime - startTime) / splits;
+ for (int i = 0; i < splits; ++i) {
+
+ final long slotStartTime = startTime + (i * duration);
+ if (slotStartTime > lastTimestamp) {
+ // ignore this slot
+ continue;
+ }
+ final long slotEndTime = startTime + ((i + 1) * duration);
+ final SearchCondition condition = new SearchCondition();
+ final String slotStartTimeString = DateTimeUtil.secondsToHumanDate(slotStartTime / 1000);
+ final String slotEndTimeString = DateTimeUtil.secondsToHumanDate(slotEndTime / 1000);
+ condition.setStartTime(slotStartTimeString);
+ condition.setEndTime(slotEndTimeString);
+
+ condition.setFilter(comp.filter());
+ condition.setQueryExpression(comp.getQueryExpression());
+ if (partitionValues != null) {
+ condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
+ }
+ // Should be careful to the startRowkey setting. Only set startRowkey when
+ // lastTimestamp is within the slot time range.
+ if (startRowkey != null && lastTimestamp >= startTime && lastTimestamp < endTime) {
+ condition.setStartRowkey(startRowkey);
+ }
+ condition.setPageSize(pageSize);
+
+ if (comp.hasAgg()) {
+ List<String> groupbyFields = comp.groupbyFields();
+ List<String> outputFields = new ArrayList<String>();
+ if(groupbyFields != null){
+ outputFields.addAll(groupbyFields);
+ }
+ outputFields.addAll(comp.aggregateFields());
+ condition.setOutputFields(outputFields);
+ } else {
+ condition.setOutputFields(comp.outputFields());
+ }
+ readers.add(new GenericEntityBatchReader(comp.serviceName(), condition));
+ }
+ return readers;
+ }
+
+
+ private static void partitionConstraintValidate(List<String[]> partitionValues, String query) {
+ if (partitionValues != null && partitionValues.size() > 1) {
+ final String[] values = partitionValues.get(0);
+ for (int i = 1; i < partitionValues.size(); ++i) {
+ final String[] tmpValues = partitionValues.get(i);
+ for (int j = 0; j < values.length; ++j) {
+ if (values[j] == null || (!values[j].equals(tmpValues[j]))) {
+ final String errMsg = "One query for multiple partitions is NOT allowed for now! Query: " + query;
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ }
+ }
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static class EntityResultSet<ENTITY extends TaggedLogAPIEntity> {
+ private static final long DEFAULT_TIMEOUT_IN_MS = 1000;
+
+ private boolean fetchCompleted = false;
+ private final BlockingQueue<TaggedLogAPIEntity> queue;
+ private volatile Exception ex = null;
+
+ public EntityResultSet(BlockingQueue<TaggedLogAPIEntity> queue) {
+ this.queue = queue;
+ }
+
+ public boolean hasMoreData() {
+ return queue.size() > 0 || (!fetchCompleted);
+ }
+
+ public ENTITY next(long timeout, TimeUnit unit) throws InterruptedException {
+ if (fetchCompleted) {
+ return null;
+ }
+ final TaggedLogAPIEntity entity = queue.poll(timeout, unit);
+ if (COMPLETED_ENTITY.equals(entity)) {
+ fetchCompleted = true;
+ return null;
+ }
+ return (ENTITY)entity;
+ }
+
+ public ENTITY next() throws Exception {
+ TaggedLogAPIEntity entity = null;
+ while (!fetchCompleted) {
+ try {
+ entity = queue.poll(DEFAULT_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
+ if (COMPLETED_ENTITY.equals(entity)) {
+ fetchCompleted = true;
+ if (ex != null) {
+ throw ex;
+ }
+ return null;
+ }
+ if (entity != null) {
+ return (ENTITY)entity;
+ }
+ } catch (InterruptedException ex) {
+ // Just ignore
+ }
+ }
+ return null;
+ }
+
+ void setException(Exception ex) {
+ this.ex = ex;
+ }
+
+ BlockingQueue<TaggedLogAPIEntity> getQueue() {
+ return queue;
+ }
+ }
+
+ private static class EntityFetchThread<ENTITY extends TaggedLogAPIEntity> extends Thread {
+
+ private final GenericEntityBatchReader reader;
+ private final AtomicInteger threadCount;
+ private final AtomicInteger entityCount;
+ private final EntityResultSet<ENTITY> resultSet;
+
+ private EntityFetchThread(GenericEntityBatchReader reader, AtomicInteger threadCount, AtomicInteger entityCount, EntityResultSet<ENTITY> resultSet) {
+ this.reader = reader;
+ this.threadCount = threadCount;
+ this.entityCount = entityCount;
+ this.resultSet = resultSet;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final List<ENTITY> entities = reader.read();
+ entityCount.addAndGet(entities.size());
+ for (ENTITY entity : entities) {
+ if (!resultSet.getQueue().offer(entity, MAX_WRITE_TIME_OUT_IN_SECONDS, TimeUnit.SECONDS)) {
+ resultSet.setException(new IOException("Write entity to queue timeout"));
+ resultSet.getQueue().add(COMPLETED_ENTITY);
+ }
+ }
+ } catch (Exception ex) {
+ resultSet.setException(ex);
+ resultSet.getQueue().add(COMPLETED_ENTITY);
+ } finally {
+ final int count = threadCount.decrementAndGet();
+ if (count == 0) {
+ resultSet.getQueue().add(COMPLETED_ENTITY);
+ LOG.info("Total fetched " + entityCount.get() + " entities");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
new file mode 100644
index 0000000..43302c8
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import com.sun.jersey.core.header.FormDataContentDisposition;
+import com.sun.jersey.multipart.FormDataParam;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.storage.DataStorage;
+import org.apache.eagle.storage.DataStorageManager;
+import org.apache.eagle.storage.exception.IllegalDataStorageException;
+import org.apache.eagle.storage.operation.*;
+import org.apache.eagle.storage.result.ModifyResult;
+import org.apache.eagle.storage.result.QueryResult;
+import com.sun.jersey.api.json.JSONWithPadding;
+import org.apache.commons.lang.time.StopWatch;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.type.TypeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.GenericEntity;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since 3/18/15
+ */
+@Path(GenericEntityServiceResource.ROOT_PATH)
+@SuppressWarnings("unchecked")
+public class GenericEntityServiceResource {
+ public final static String ROOT_PATH = "/entities";
+ public final static String JSONP_PATH = "jsonp";
+ public final static String DELETE_ENTITIES_PATH = "delete";
+ public final static String ROWKEY_PATH = "rowkey";
+
+ public final static String FIRST_TIMESTAMP = "firstTimestamp";
+ public final static String LAST_TIMESTAMP = "lastTimestamp";
+ public final static String ELAPSEDMS = "elapsedms";
+ public final static String TOTAL_RESULTS = "totalResults";
+
+ private final static Logger LOG = LoggerFactory.getLogger(GenericEntityServiceResource.class);
+
+ private List<? extends TaggedLogAPIEntity> unmarshalEntitiesByServie(InputStream inputStream,EntityDefinition entityDefinition) throws IllegalAccessException, InstantiationException, IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entityDefinition.getEntityClass()));
+ }
+
+ private List<String> unmarshalAsStringlist(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, String.class));
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity create(InputStream inputStream,
+ @QueryParam("serviceName") String serviceName){
+ GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+ Map<String,Object> meta = new HashMap<>();
+ StopWatch stopWatch = new StopWatch();
+ try {
+ stopWatch.start();
+ EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+ if(entityDefinition == null){
+ throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+ }
+
+ List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(inputStream, entityDefinition);
+ DataStorage dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ CreateStatement createStatement = new CreateStatement(entities,entityDefinition);
+ ModifyResult<String> result = createStatement.execute(dataStorage);
+ if(result.isSuccess()) {
+ List<String> keys =result.getIdentifiers();
+ if(keys != null) {
+ response.setObj(keys, String.class);
+ response.setObj(keys, String.class);
+ meta.put(TOTAL_RESULTS,keys.size());
+ }else{
+ meta.put(TOTAL_RESULTS,0);
+ }
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+ response.setMeta(meta);
+ response.setSuccess(true);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ response.setException(e);
+ }finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+
+ @POST
+ @Consumes({MediaType.MULTIPART_FORM_DATA})
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity create(@FormDataParam("file") InputStream fileInputStream,
+ @FormDataParam("file") FormDataContentDisposition cdh,
+ @QueryParam("serviceName") String serviceName) {
+ GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+ Map<String,Object> meta = new HashMap<>();
+ StopWatch stopWatch = new StopWatch();
+ try {
+ stopWatch.start();
+ EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+ if(entityDefinition == null){
+ throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+ }
+
+ List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(fileInputStream, entityDefinition);
+ DataStorage dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ CreateStatement createStatement = new CreateStatement(entities,entityDefinition);
+ ModifyResult<String> result = createStatement.execute(dataStorage);
+ if(result.isSuccess()) {
+ List<String> keys =result.getIdentifiers();
+ if(keys != null) {
+ response.setObj(keys, String.class);
+ response.setObj(keys, String.class);
+ meta.put(TOTAL_RESULTS,keys.size());
+ }else{
+ meta.put(TOTAL_RESULTS,0);
+ }
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+ response.setMeta(meta);
+ response.setSuccess(true);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ response.setException(e);
+ }finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity update(InputStream inputStream,
+ @QueryParam("serviceName") String serviceName){
+ GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+ DataStorage dataStorage;
+ Map<String,Object> meta = new HashMap<>();
+ StopWatch stopWatch = new StopWatch();
+ try {
+ stopWatch.start();
+ EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+ if(entityDefinition == null){
+ throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+ }
+
+ List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(inputStream, entityDefinition);
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+
+ UpdateStatement updateStatement = new UpdateStatement(entities,entityDefinition);
+ ModifyResult<String> result = updateStatement.execute(dataStorage);
+ if(result.isSuccess()) {
+ List<String> keys =result.getIdentifiers();
+ if(keys != null) {
+ response.setObj(keys, String.class);
+ meta.put(TOTAL_RESULTS,keys.size());
+ }else{
+ meta.put(TOTAL_RESULTS,0);
+ }
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+ response.setMeta(meta);
+ response.setSuccess(true);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ response.setException(e);
+ } finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+
+ @PUT
+ @Consumes({MediaType.MULTIPART_FORM_DATA})
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity update(@FormDataParam("file") InputStream fileInputStream,
+ @FormDataParam("file") FormDataContentDisposition cdh,
+ @QueryParam("serviceName") String serviceName){
+ GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+ DataStorage dataStorage;
+ Map<String,Object> meta = new HashMap<>();
+ StopWatch stopWatch = new StopWatch();
+ try {
+ stopWatch.start();
+ EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+
+ if(entityDefinition == null){
+ throw new IllegalArgumentException("entity definition of service "+serviceName+" not found");
+ }
+
+ List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(fileInputStream, entityDefinition);
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+
+ UpdateStatement updateStatement = new UpdateStatement(entities,entityDefinition);
+ ModifyResult<String> result = updateStatement.execute(dataStorage);
+ if(result.isSuccess()) {
+ List<String> keys =result.getIdentifiers();
+ if(keys != null) {
+ response.setObj(keys, String.class);
+ meta.put(TOTAL_RESULTS,keys.size());
+ }else{
+ meta.put(TOTAL_RESULTS,0);
+ }
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+ response.setMeta(meta);
+ response.setSuccess(true);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ response.setException(e);
+ } finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+
+
+
+ /**
+ * @param value rowkey value
+ * @param serviceName entity service name
+ * @return GenericServiceAPIResponseEntity
+ */
+ @GET
+ @Path(ROWKEY_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity search(@QueryParam("value") String value,@QueryParam("serviceName") String serviceName){
+ GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+ Map<String,Object> meta = new HashMap<>();
+ DataStorage dataStorage;
+ StopWatch stopWatch = null;
+ try {
+ if(serviceName == null) throw new IllegalArgumentException("serviceName is null");
+ RowkeyQueryStatement queryStatement = new RowkeyQueryStatement(value,serviceName);
+ stopWatch = new StopWatch();
+ stopWatch.start();
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ if(dataStorage==null){
+ LOG.error("Data storage is null");
+ throw new IllegalDataStorageException("data storage is null");
+ }
+ QueryResult<?> result = queryStatement.execute(dataStorage);
+ if(result.isSuccess()){
+ meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
+ meta.put(LAST_TIMESTAMP, result.getLastTimestamp());
+ meta.put(TOTAL_RESULTS, result.getSize());
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+ response.setObj(result.getData());
+ response.setType(result.getEntityType());
+ response.setSuccess(true);
+ response.setMeta(meta);
+ return response;
+ }
+ } catch (Exception e) {
+ response.setException(e);
+ LOG.error(e.getMessage(),e);
+ }finally {
+ if(stopWatch!=null) stopWatch.stop();
+ }
+ return response;
+ }
+
+ /**
+ * @param serviceName entity service name
+ * @return GenericServiceAPIResponseEntity
+ */
+ @POST
+ @Path(ROWKEY_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity search(InputStream inputStream,@QueryParam("serviceName") String serviceName){
+ GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+ Map<String,Object> meta = new HashMap<>();
+ DataStorage dataStorage;
+
+ StopWatch stopWatch = null;
+ try {
+ if(serviceName == null) throw new IllegalArgumentException("serviceName is null");
+
+ final List<String> values = unmarshalAsStringlist(inputStream);
+ final RowkeyQueryStatement queryStatement = new RowkeyQueryStatement(values,serviceName);
+
+ stopWatch = new StopWatch();
+ stopWatch.start();
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ if(dataStorage==null){
+ LOG.error("Data storage is null");
+ throw new IllegalDataStorageException("Data storage is null");
+ }
+ QueryResult<?> result = queryStatement.execute(dataStorage);
+ if(result.isSuccess()){
+ meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
+ meta.put(LAST_TIMESTAMP, result.getLastTimestamp());
+ meta.put(TOTAL_RESULTS, result.getSize());
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+ response.setObj(result.getData());
+ response.setType(result.getEntityType());
+ response.setSuccess(true);
+ response.setMeta(meta);
+ return response;
+ }
+ } catch (Exception e) {
+ response.setException(e);
+ LOG.error(e.getMessage(),e);
+ }finally {
+ if(stopWatch!=null) stopWatch.stop();
+ }
+ return response;
+ }
+
+
+ /**
+ *
+ * @param query
+ * @param startTime
+ * @param endTime
+ * @param pageSize
+ * @param startRowkey
+ * @param treeAgg
+ * @param timeSeries
+ * @param intervalmin
+ * @param top
+ * @param filterIfMissing
+ * @param parallel
+ * @param metricName
+ * @param verbose
+ * @return
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @SuppressWarnings("unchecked")
+ public GenericServiceAPIResponseEntity search(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("verbose") Boolean verbose){
+ RawQuery rawQuery = RawQuery.build()
+ .query(query)
+ .startTime(startTime)
+ .endTime(endTime)
+ .pageSize(pageSize)
+ .startRowkey(startRowkey)
+ .treeAgg(treeAgg)
+ .timeSeries(timeSeries)
+ .intervalMin(intervalmin)
+ .top(top)
+ .filerIfMissing(filterIfMissing)
+ .parallel(parallel)
+ .metricName(metricName)
+ .verbose(verbose)
+ .done();
+
+ QueryStatement queryStatement = new QueryStatement(rawQuery);
+ GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+ Map<String,Object> meta = new HashMap<>();
+
+ DataStorage dataStorage;
+ StopWatch stopWatch = new StopWatch();
+ try {
+ stopWatch.start();
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ if(dataStorage==null){
+ LOG.error("Data storage is null");
+ throw new IllegalDataStorageException("data storage is null");
+ }
+
+ QueryResult<?> result = queryStatement.execute(dataStorage);
+ if(result.isSuccess()){
+ meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
+ meta.put(LAST_TIMESTAMP, result.getLastTimestamp());
+ meta.put(TOTAL_RESULTS, result.getSize());
+ meta.put(ELAPSEDMS,stopWatch.getTime());
+
+ response.setObj(result.getData());
+ response.setType(result.getEntityType());
+ response.setSuccess(true);
+ response.setMeta(meta);
+ return response;
+ }
+ } catch (Exception e) {
+ response.setException(e);
+ LOG.error(e.getMessage(),e);
+ }finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+
+ /**
+ *
+ * @param query
+ * @param startTime
+ * @param endTime
+ * @param pageSize
+ * @param startRowkey
+ * @param treeAgg
+ * @param timeSeries
+ * @param intervalmin
+ * @param top
+ * @param filterIfMissing
+ * @param parallel
+ * @param metricName
+ * @param verbose
+ * @return
+ */
+ @GET
+ @Path(JSONP_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public JSONWithPadding searchWithJsonp(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("verbose") Boolean verbose,
+ @QueryParam("callback") String callback){
+ GenericServiceAPIResponseEntity result = search(query, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName, verbose);
+ return new JSONWithPadding(new GenericEntity<GenericServiceAPIResponseEntity>(result){}, callback);
+ }
+
+ /**
+ * TODO
+ *
+ * Delete by query
+ *
+ * @return
+ */
+ @DELETE
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity deleteByQuery(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("verbose") Boolean verbose){
+ RawQuery rawQuery = RawQuery.build()
+ .query(query)
+ .startTime(startTime)
+ .endTime(endTime)
+ .pageSize(pageSize)
+ .startRowkey(startRowkey)
+ .treeAgg(treeAgg)
+ .timeSeries(timeSeries)
+ .intervalMin(intervalmin)
+ .top(top)
+ .filerIfMissing(filterIfMissing)
+ .parallel(parallel)
+ .metricName(metricName)
+ .verbose(verbose)
+ .done();
+
+ GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+ Map<String,Object> meta = new HashMap<String, Object>();
+ DataStorage dataStorage = null;
+ StopWatch stopWatch = new StopWatch();
+ try {
+ stopWatch.start();
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ if(dataStorage==null){
+ LOG.error("Data storage is null");
+ throw new IllegalDataStorageException("Data storage is null");
+ }
+
+ DeleteStatement deleteStatement = new DeleteStatement(rawQuery);
+ ModifyResult<String> deleteResult = deleteStatement.execute(dataStorage);
+ if(deleteResult.isSuccess()){
+ meta.put(ELAPSEDMS, stopWatch.getTime());
+ response.setObj(deleteResult.getIdentifiers(),String.class);
+ response.setSuccess(true);
+ response.setMeta(meta);
+ }
+ return response;
+ } catch (Exception e) {
+ response.setException(e);
+ LOG.error(e.getMessage(),e);
+ }finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+
+ /**
+ *
+ * Delete by entity lists
+ *
+ * Use "POST /entities/delete" instead of "DELETE /entities" to walk around jersey DELETE issue for request with body
+ *
+ * @param inputStream
+ * @param serviceName
+ * @return
+ */
+ @POST
+ @Path(DELETE_ENTITIES_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericServiceAPIResponseEntity deleteEntities(InputStream inputStream,
+ @QueryParam("serviceName") String serviceName,
+ @QueryParam("byId") Boolean deleteById){
+ GenericServiceAPIResponseEntity<String> response = new GenericServiceAPIResponseEntity<String>();
+ DataStorage dataStorage = null;
+ Map<String,Object> meta = new HashMap<String, Object>();
+
+ if(deleteById == null) deleteById = false;
+
+ StopWatch stopWatch = new StopWatch();
+
+ try {
+ stopWatch.start();
+ dataStorage = DataStorageManager.getDataStorageByEagleConfig();
+ DeleteStatement statement = new DeleteStatement(serviceName);
+
+ if(deleteById) {
+ LOG.info("Deleting "+serviceName+" by ids");
+ List<String> deleteIds = unmarshalAsStringlist(inputStream);
+ statement.setIds(deleteIds);
+ }else {
+ LOG.info("Deleting "+serviceName+" by entities");
+ EntityDefinition entityDefinition = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ if (entityDefinition == null) {
+ throw new IllegalArgumentException("Entity definition of service " + serviceName + " not found");
+ }
+ List<? extends TaggedLogAPIEntity> entities = unmarshalEntitiesByServie(inputStream, entityDefinition);
+ statement.setEntities(entities);
+ }
+
+ ModifyResult<String> result = statement.execute(dataStorage);
+ if (result.isSuccess()) {
+ List<String> keys = result.getIdentifiers();
+ if (keys != null) {
+ response.setObj(keys, String.class);
+ meta.put(TOTAL_RESULTS, keys.size());
+ } else {
+ meta.put(TOTAL_RESULTS, 0);
+ }
+ meta.put(ELAPSEDMS, stopWatch.getTime());
+ response.setMeta(meta);
+ response.setSuccess(true);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ response.setException(e);
+ }finally {
+ stopWatch.stop();
+ }
+ return response;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java
new file mode 100755
index 0000000..c10c28d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericObjectMapperProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ser.FilterProvider;
+
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.ext.ContextResolver;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+@Produces(MediaType.APPLICATION_JSON)
+public class GenericObjectMapperProvider implements ContextResolver<ObjectMapper> {
+ private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ @Override
+ public ObjectMapper getContext(Class<?> clazz) {
+ return OBJECT_MAPPER;
+ }
+ public static void setFilter(FilterProvider filter){
+ OBJECT_MAPPER.setFilters(filter);
+ }
+
+ static{
+ setFilter(TaggedLogAPIEntity.getFilterProvider());
+ // set more filter here
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java
new file mode 100755
index 0000000..b14dc22
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/ListQueryResource.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.query.GenericQuery;
+import org.apache.eagle.query.ListQueryCompiler;
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import org.apache.eagle.storage.hbase.query.GenericQueryBuilder;
+import org.apache.eagle.common.DateTimeUtil;
+import com.sun.jersey.api.json.JSONWithPadding;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.query.aggregate.timeseries.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.GenericEntity;
+import javax.ws.rs.core.MediaType;
+import java.util.*;
+
+@Path("list")
+public class ListQueryResource {
+ private static final Logger LOG = LoggerFactory.getLogger(ListQueryResource.class);
+
+ /**
+ * Support old interface without verbose parameter
+ */
+ public ListQueryAPIResponseEntity listQuery(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("metricName") String metricName){
+ return listQuery(query, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName,true);
+ }
+
+ /**
+ * TODO refactor the code structure, now it's messy
+ * @param query
+ * @param startTime
+ * @param endTime
+ * @param pageSize
+ * @param startRowkey
+ * @param treeAgg
+ * @param timeSeries
+ * @param intervalmin
+ * @return
+ */
+ @GET
+ @Produces({MediaType.APPLICATION_JSON})
+ public ListQueryAPIResponseEntity listQuery(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("verbose") Boolean verbose) {
+ if(!EagleConfigFactory.load().isCoprocessorEnabled())
+ return listQueryWithoutCoprocessor(query,startTime,endTime,pageSize,startRowkey,treeAgg,timeSeries,intervalmin,top,filterIfMissing,parallel,metricName,verbose);
+
+ StopWatch watch = new StopWatch();
+ watch.start();
+ ListQueryAPIResponseEntity result = new ListQueryAPIResponseEntity();
+ try{
+ validateQueryParameters(startRowkey, pageSize);
+
+ // 1. Compile query to parse parameters and HBase Filter
+ ListQueryCompiler comp = new ListQueryCompiler(query, filterIfMissing);
+ String serviceName = comp.serviceName();
+
+ SearchCondition condition = new SearchCondition();
+ condition.setOutputVerbose(verbose == null || verbose);
+ condition.setOutputAlias(comp.getOutputAlias());
+ condition.setFilter(comp.filter());
+ condition.setQueryExpression(comp.getQueryExpression());
+ if(comp.sortOptions() == null && top > 0) {
+ LOG.warn("Parameter \"top\" is only used for sort query! Ignore top parameter this time since it's not a sort query");
+ }
+
+ // 2. Initialize partition values if set
+ // TODO: For now we don't support one query to query multiple partitions. In future
+ // if partition is defined for the entity, internally We need to spawn multiple
+ // queries and send one query for each search condition for each partition
+ final List<String[]> partitionValues = comp.getQueryPartitionValues();
+ if (partitionValues != null) {
+ condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
+ }
+
+ // 3. Set time range if it's timeseries service
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ if(ed.isTimeSeries()){
+ // TODO check timestamp exists for timeseries or topology data
+ condition.setStartTime(startTime);
+ condition.setEndTime(endTime);
+ }
+
+ // 4. Set HBase start scanning rowkey if given
+ condition.setStartRowkey(startRowkey);
+
+ // 5. Set page size
+ condition.setPageSize(pageSize);
+
+ // 6. Generate output,group-by,aggregated fields
+ List<String> outputFields = comp.outputFields();
+ List<String> groupbyFields = comp.groupbyFields();
+ List<String> aggregateFields = comp.aggregateFields();
+ Set<String> filterFields = comp.getFilterFields();
+
+ // Start to generate output fields list {
+ condition.setOutputAll(comp.isOutputAll());
+ if(outputFields == null) outputFields = new ArrayList<String>();
+ if(comp.hasAgg()){
+ if(groupbyFields != null) outputFields.addAll(groupbyFields);
+ if(aggregateFields != null) outputFields.addAll(aggregateFields);
+ if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+ outputFields.add(GenericMetricEntity.VALUE_FIELD);
+ }
+ }
+ if(filterFields!=null) outputFields.addAll(filterFields);
+ condition.setOutputFields(outputFields);
+ if(comp.isOutputAll()){
+ LOG.info("Output fields: ALL");
+ }else{
+ LOG.info("Output fields: " + StringUtils.join(outputFields, ","));
+ }
+ // } END
+
+ // 7. Build GenericQuery
+ GenericQuery reader = GenericQueryBuilder
+ .select(outputFields)
+ .from(serviceName, metricName).where(condition)
+ .groupBy(
+ comp.hasAgg(),
+ groupbyFields,
+ comp.aggregateFunctionTypes(),
+ aggregateFields)
+ .timeSeries(timeSeries, intervalmin)
+ .treeAgg(treeAgg)
+ .orderBy(comp.sortOptions(),comp.sortFunctions(),comp.sortFields()).top(top)
+ .parallel(parallel)
+ .build();
+
+ // 8. Fill response object
+ List entities = reader.result();
+ result.setObj(entities);
+ result.setTotalResults(entities.size());
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimeStamp());
+ }catch(Exception ex){
+ LOG.error("Fail executing list query", ex);
+ result.setException(EagleExceptionWrapper.wrap(ex));
+ result.setSuccess(false);
+ return result;
+ }finally{
+ watch.stop();
+ result.setElapsedms(watch.getTime());
+ }
+ LOG.info("Query done " + watch.getTime() + " ms");
+ return result;
+ }
+
+ /**
+ * <b>TODO</b> remove the legacy deprecated implementation of listQueryWithoutCoprocessor
+ *
+ * @see #listQuery(String, String, String, int, String, boolean, boolean, long, int, boolean, int, String,Boolean)
+ *
+ * @param query
+ * @param startTime
+ * @param endTime
+ * @param pageSize
+ * @param startRowkey
+ * @param treeAgg
+ * @param timeSeries
+ * @param intervalmin
+ * @return
+ */
+ @GET
+ @Path("/legacy")
+ @Produces({MediaType.APPLICATION_JSON})
+ @Deprecated
+ public ListQueryAPIResponseEntity listQueryWithoutCoprocessor(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("verbose") Boolean verbose) {
+ StopWatch watch = new StopWatch();
+ watch.start();
+ ListQueryAPIResponseEntity result = new ListQueryAPIResponseEntity();
+ try{
+ validateQueryParameters(startRowkey, pageSize);
+ ListQueryCompiler comp = new ListQueryCompiler(query, filterIfMissing);
+ String serviceName = comp.serviceName();
+
+ SearchCondition condition = new SearchCondition();
+ condition.setFilter(comp.filter());
+ condition.setQueryExpression(comp.getQueryExpression());
+ if(comp.sortOptions() == null && top > 0) {
+ LOG.warn("Parameter \"top\" is only used for sort query! Ignore top parameter this time since it's not a sort query");
+ }
+
+ // TODO: For now we don't support one query to query multiple partitions. In future
+ // if partition is defined for the entity, internally We need to spawn multiple
+ // queries and send one query for each search condition for each partition
+ final List<String[]> partitionValues = comp.getQueryPartitionValues();
+ if (partitionValues != null) {
+ condition.setPartitionValues(Arrays.asList(partitionValues.get(0)));
+ }
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ if(ed.isTimeSeries()){
+ // TODO check timestamp exists for timeseries or topology data
+ condition.setStartTime(startTime);
+ condition.setEndTime(endTime);
+ }
+ condition.setOutputVerbose(verbose==null || verbose );
+ condition.setOutputAlias(comp.getOutputAlias());
+ condition.setOutputAll(comp.isOutputAll());
+ condition.setStartRowkey(startRowkey);
+ condition.setPageSize(pageSize);
+
+ List<String> outputFields = comp.outputFields();
+ if(outputFields == null) outputFields = new ArrayList<String>();
+
+ /**
+ * TODO ugly logic, waiting for refactoring
+ */
+ if(!comp.hasAgg() && !serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){ // pure list query
+// List<String> outputFields = comp.outputFields();
+ Set<String> filterFields = comp.getFilterFields();
+ if(filterFields != null) outputFields.addAll(filterFields);
+ condition.setOutputFields(outputFields);
+ if(condition.isOutputAll()){
+ LOG.info("Output: ALL");
+ }else{
+ LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+ }
+ GenericEntityBatchReader reader = new GenericEntityBatchReader(serviceName, condition);
+ List<? extends TaggedLogAPIEntity> entityList = reader.read();
+ result.setObj(entityList);
+ result.setTotalResults(entityList.size());
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimestamp());
+ }else if(!comp.hasAgg() && serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
+ // validate metric name
+ if(metricName == null || metricName.isEmpty()){
+ throw new IllegalArgumentException("metricName should not be empty for metric list query");
+ }
+// List<String> outputFields = comp.outputFields();
+ Set<String> filterFields = comp.getFilterFields();
+ if(filterFields != null) outputFields.addAll(filterFields);
+ condition.setOutputFields(outputFields);
+ if(condition.isOutputAll()){
+ LOG.info("Output: ALL");
+ }else{
+ LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+ }
+ GenericMetricEntityBatchReader reader = new GenericMetricEntityBatchReader(metricName, condition);
+ List<? extends TaggedLogAPIEntity> entityList = reader.read();
+ result.setObj(entityList);
+ result.setTotalResults(entityList.size());
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimestamp());
+ }
+ else if(!treeAgg && !timeSeries && parallel <= 0 ){ // non time-series based aggregate query, not hierarchical
+ List<String> groupbyFields = comp.groupbyFields();
+ List<String> aggregateFields = comp.aggregateFields();
+ Set<String> filterFields = comp.getFilterFields();
+// List<String> outputFields = new ArrayList<String>();
+ if(groupbyFields != null) outputFields.addAll(groupbyFields);
+ if(filterFields != null) outputFields.addAll(filterFields);
+ outputFields.addAll(aggregateFields);
+
+ if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+ outputFields.add(GenericMetricEntity.VALUE_FIELD);
+ }
+
+ FlatAggregator agg = new FlatAggregator(groupbyFields, comp.aggregateFunctionTypes(), comp.aggregateFields());
+ StreamReader reader = null;
+ if(ed.getMetricDefinition() == null){
+ reader = new GenericEntityStreamReader(serviceName, condition);
+ }else{ // metric aggregation need metric reader
+ reader = new GenericMetricEntityDecompactionStreamReader(metricName, condition);
+ }
+ condition.setOutputFields(outputFields);
+ if(condition.isOutputAll()){
+ LOG.info("Output: ALL");
+ }else{
+ LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+ }
+ reader.register(agg);
+ reader.readAsStream();
+ ArrayList<Map.Entry<List<String>, List<Double>>> obj = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+ obj.addAll(agg.result().entrySet());
+ if(comp.sortOptions() == null){
+ result.setObj(obj);
+ }else{ // has sort options
+ result.setObj(PostFlatAggregateSort.sort(agg.result(), comp.sortOptions(), top));
+ }
+ result.setTotalResults(0);
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimestamp());
+ }else if(!treeAgg && !timeSeries && parallel > 0){ // TODO ugly branch, let us refactor
+ List<String> groupbyFields = comp.groupbyFields();
+ List<String> aggregateFields = comp.aggregateFields();
+ Set<String> filterFields = comp.getFilterFields();
+// List<String> outputFields = new ArrayList<String>();
+ if(groupbyFields != null) outputFields.addAll(groupbyFields);
+ if(filterFields != null) outputFields.addAll(filterFields);
+ outputFields.addAll(aggregateFields);
+ if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+ outputFields.add(GenericMetricEntity.VALUE_FIELD);
+ }
+ condition.setOutputFields(outputFields);
+ if(condition.isOutputAll()){
+ LOG.info("Output: ALL");
+ }else{
+ LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+ }
+ FlatAggregator agg = new FlatAggregator(groupbyFields, comp.aggregateFunctionTypes(), comp.aggregateFields());
+ EntityCreationListener listener = EntityCreationListenerFactory.synchronizedEntityCreationListener(agg);
+ StreamReader reader = new GenericEntityStreamReaderMT(serviceName, condition, parallel);
+ reader.register(listener);
+ reader.readAsStream();
+ ArrayList<Map.Entry<List<String>, List<Double>>> obj = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+ obj.addAll(agg.result().entrySet());
+ if(comp.sortOptions() == null){
+ result.setObj(obj);
+ }else{ // has sort options
+ result.setObj(PostFlatAggregateSort.sort(agg.result(), comp.sortOptions(), top));
+ }
+ result.setTotalResults(0);
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimestamp());
+ }else if(!treeAgg && timeSeries){ // time-series based aggregate query, not hierarchical
+ List<String> groupbyFields = comp.groupbyFields();
+ List<String> sortFields = comp.sortFields();
+ List<String> aggregateFields = comp.aggregateFields();
+ Set<String> filterFields = comp.getFilterFields();
+// List<String> outputFields = new ArrayList<String>();
+ if(groupbyFields != null) outputFields.addAll(groupbyFields);
+ if(filterFields != null) outputFields.addAll(filterFields);
+ if (sortFields != null) outputFields.addAll(sortFields);
+ outputFields.addAll(aggregateFields);
+ if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+ outputFields.add(GenericMetricEntity.VALUE_FIELD);
+ }
+ StreamReader reader = null;
+ if(ed.getMetricDefinition() == null){
+ if(parallel <= 0){ // TODO ugly quick win
+ reader = new GenericEntityStreamReader(serviceName, condition);
+ }else{
+ reader = new GenericEntityStreamReaderMT(serviceName, condition, parallel);
+ }
+ }else{ // metric aggregation need metric reader
+ reader = new GenericMetricEntityDecompactionStreamReader(metricName, condition);
+ if(!outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+ outputFields.add(GenericMetricEntity.VALUE_FIELD);
+ }
+ }
+ condition.setOutputFields(outputFields);
+ if(condition.isOutputAll()){
+ LOG.info("Output: ALL");
+ }else{
+ LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+ }
+ TimeSeriesAggregator tsAgg = new TimeSeriesAggregator(groupbyFields, comp.aggregateFunctionTypes(), aggregateFields,
+ DateTimeUtil.humanDateToDate(condition.getStartTime()).getTime(), DateTimeUtil.humanDateToDate(condition.getEndTime()).getTime(), intervalmin*60*1000);
+ if(parallel <= 0){
+ reader.register(tsAgg);
+ }else{
+ EntityCreationListener listener = EntityCreationListenerFactory.synchronizedEntityCreationListener(tsAgg);
+ reader.register(listener);
+ }
+ // for sorting
+ FlatAggregator sortAgg = null;
+ if (comp.sortOptions() != null) {
+ sortAgg = new FlatAggregator(groupbyFields, comp.sortFunctions(), comp.sortFields());
+ if(parallel <= 0){
+ reader.register(sortAgg);
+ }else{
+ EntityCreationListener listener = EntityCreationListenerFactory.synchronizedEntityCreationListener(sortAgg);
+ reader.register(listener);
+ }
+ }
+ reader.readAsStream();
+ ArrayList<Map.Entry<List<String>, List<double[]>>> obj = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+ obj.addAll(tsAgg.getMetric().entrySet());
+ if(comp.sortOptions() == null){
+ result.setObj(obj);
+ }else{ // has sort options
+ result.setObj(TimeSeriesPostFlatAggregateSort.sort(sortAgg.result(), tsAgg.getMetric(), comp.sortOptions(), top));
+ }
+ result.setTotalResults(0);
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimestamp());
+ }
+ else{ // use hierarchical aggregate mode
+ List<String> groupbyFields = comp.groupbyFields();
+ List<String> aggregateFields = comp.aggregateFields();
+ Set<String> filterFields = comp.getFilterFields();
+// List<String> outputFields = new ArrayList<String>();
+ if(groupbyFields != null) outputFields.addAll(groupbyFields);
+ if(filterFields != null) outputFields.addAll(filterFields);
+ outputFields.addAll(aggregateFields);
+ if(GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(serviceName) && !outputFields.contains(GenericMetricEntity.VALUE_FIELD)){
+ outputFields.add(GenericMetricEntity.VALUE_FIELD);
+ }
+ condition.setOutputFields(outputFields);
+ if(condition.isOutputAll()){
+ LOG.info("Output: ALL");
+ }else{
+ LOG.info("Output: " + StringUtils.join(condition.getOutputFields(), ", "));
+ }
+ GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, condition);
+ HierarchicalAggregator agg = new HierarchicalAggregator(groupbyFields, comp.aggregateFunctionTypes(), comp.aggregateFields());
+ reader.register(agg);
+ reader.readAsStream();
+ if(comp.sortOptions() == null){
+ result.setObj(agg.result());
+ }else{ // has sort options
+ result.setObj(PostHierarchicalAggregateSort.sort(agg.result(), comp.sortOptions()));
+ }
+ result.setTotalResults(0);
+ result.setSuccess(true);
+ result.setLastTimestamp(reader.getLastTimestamp());
+ result.setFirstTimestamp(reader.getFirstTimestamp());
+ }
+ }catch(Exception ex){
+ LOG.error("Fail executing list query: " + query, ex);
+ result.setException(EagleExceptionWrapper.wrap(ex));
+ result.setSuccess(false);
+ return result;
+ }finally{
+ watch.stop();
+ result.setElapsedms(watch.getTime());
+ }
+ LOG.info("Query done " + watch.getTime() + " ms");
+ return result;
+ }
+
+
+ @GET
+ @Path("/jsonp")
+ @Produces({"application/x-javascript", "application/json", "application/xml"})
+ public JSONWithPadding listQueryWithJsonp(@QueryParam("query") String query,
+ @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
+ @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey,
+ @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries,
+ @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top,
+ @QueryParam("filterIfMissing") boolean filterIfMissing,
+ @QueryParam("parallel") int parallel,
+ @QueryParam("update") String callback,
+ @QueryParam("verbose") boolean verbose) {
+ ListQueryAPIResponseEntity result = listQuery(query, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, null,verbose);
+ return new JSONWithPadding(new GenericEntity<ListQueryAPIResponseEntity>(result){}, callback);
+ }
+
+ private void validateQueryParameters(String startRowkey, int pageSize){
+ if(pageSize <= 0){
+ throw new IllegalArgumentException("Positive pageSize value should be always provided. The list query format is:\n" + "eagle-service/rest/list?query=<querystring>&pageSize=10&startRowkey=xyz&startTime=xxx&endTime=xxx");
+ }
+
+ if(startRowkey != null && startRowkey.equals("null")){
+ LOG.warn("startRowkey being null string is not same to startRowkey == null");
+ }
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java
new file mode 100755
index 0000000..d5ab87f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/MetadataResource.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.generic;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.MetricDefinition;
+import com.sun.jersey.api.model.AbstractResource;
+import com.sun.jersey.api.model.AbstractResourceMethod;
+import com.sun.jersey.api.model.AbstractSubResourceMethod;
+import com.sun.jersey.server.impl.modelapi.annotation.IntrospectionModeller;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.Map;
+
+/**
+ * @since : 7/3/14,2014
+ */
+@Path(MetadataResource.PATH_META)
+public class MetadataResource {
+ final static String PATH_META = "meta";
+ final static String PATH_RESOURCE = "resource";
+ final static String PATH_SERVICE = "service";
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response index(@Context Application application,
+ @Context HttpServletRequest request){
+ String basePath = request.getRequestURL().toString();
+ ObjectNode root = JsonNodeFactory.instance.objectNode();
+
+ root.put(PATH_RESOURCE,joinUri(basePath,PATH_RESOURCE));
+ root.put(PATH_SERVICE,joinUri(basePath,PATH_SERVICE));
+ return Response.ok().entity(root).build();
+ }
+
+ @GET
+ @Path(PATH_RESOURCE)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response listAllResourcesRoutes(@Context Application application,
+ @Context HttpServletRequest request){
+ String basePath = request.getRequestURL().toString();
+ basePath = basePath.substring(0,basePath.length() - PATH_META.length() - PATH_RESOURCE.length() -1);
+ ObjectNode root = JsonNodeFactory.instance.objectNode();
+ root.put("base",basePath);
+ ArrayNode resources = JsonNodeFactory.instance.arrayNode();
+ root.put( "resources", resources );
+
+ for ( Class<?> aClass : application.getClasses()){
+ if ( isAnnotatedResourceClass(aClass)){
+ AbstractResource resource = IntrospectionModeller.createResource(aClass);
+ ObjectNode resourceNode = JsonNodeFactory.instance.objectNode();
+ String uriPrefix = resource.getPath().getValue();
+
+ for ( AbstractSubResourceMethod srm : resource.getSubResourceMethods() ) {
+ String uri = uriPrefix + "/" + srm.getPath().getValue();
+ addTo( resourceNode, uri, srm, joinUri(basePath, uri) );
+ }
+
+ for ( AbstractResourceMethod srm : resource.getResourceMethods() ) {
+ addTo( resourceNode, uriPrefix, srm, joinUri( basePath, uriPrefix ) );
+ }
+ resources.add( resourceNode );
+ }
+ }
+
+ return Response.ok().entity( root ).build();
+ }
+
+ private String joinUri(String basePath, String uriPrefix) {
+ if(basePath.endsWith("/") && uriPrefix.startsWith("/")){
+ return basePath.substring(0,basePath.length()-2)+uriPrefix;
+ }else if(basePath.endsWith("/") || uriPrefix.startsWith("/")){
+ return basePath+ uriPrefix;
+ }
+ return basePath+"/"+uriPrefix;
+ }
+
+ private void addTo( ObjectNode resourceNode, String uriPrefix, AbstractResourceMethod srm, String path ){
+ if(resourceNode.get( uriPrefix ) == null){
+ ObjectNode inner = JsonNodeFactory.instance.objectNode();
+ inner.put("path", path);
+ inner.put("methods", JsonNodeFactory.instance.arrayNode());
+ resourceNode.put( uriPrefix, inner );
+ }
+
+ ((ArrayNode) resourceNode.get( uriPrefix ).get("methods")).add( srm.getHttpMethod() );
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private boolean isAnnotatedResourceClass( Class rc ){
+ if ( rc.isAnnotationPresent( Path.class ) ) {
+ return true;
+ }
+
+ for ( Class i : rc.getInterfaces() ) {
+ if ( i.isAnnotationPresent( Path.class ) ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @GET
+ @Path(PATH_SERVICE)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response listAllEntities(@Context Application application,
+ @Context HttpServletRequest request) throws Exception {
+ Map<String,EntityDefinition> entities = EntityDefinitionManager.entities();
+ ObjectNode root = JsonNodeFactory.instance.objectNode();
+
+ ArrayNode services = JsonNodeFactory.instance.arrayNode();
+
+ for(Map.Entry<String,EntityDefinition> entry : entities.entrySet()){
+// ObjectNode serviceNode = JsonNodeFactory.instance.objectNode();
+// serviceNode.put(entry.getKey(),entityDefationitionAsJson(entry.getValue()));
+ services.add(entityDefationitionAsJson(entry.getValue()));
+ }
+ root.put("count",entities.keySet().size());
+ root.put("services",services);
+ return Response.ok().entity(root).build();
+ }
+
+ private JsonNode entityDefationitionAsJson(EntityDefinition def) {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.put("service",def.getService());
+ node.put("entityClass",def.getEntityClass().getName());
+ node.put("table",def.getTable());
+ node.put("columnFamily",def.getColumnFamily());
+ node.put("prefix",def.getPrefix());
+ if(def.getPartitions()!=null){
+ node.put("partitions",arrayNode(def.getPartitions()));
+ }
+ node.put("isTimeSeries",def.isTimeSeries());
+
+ MetricDefinition mdf = def.getMetricDefinition();
+ if(mdf!=null){
+ node.put("interval", mdf.getInterval());
+ }
+
+ IndexDefinition[] indexDef = def.getIndexes();
+ if(indexDef!=null){
+ ArrayNode indexDefArray = JsonNodeFactory.instance.arrayNode();
+ for(IndexDefinition idef : indexDef){
+ ObjectNode idn = JsonNodeFactory.instance.objectNode();
+ idn.put("indexPrefix",idef.getIndexPrefix());
+
+ if(idef.getIndex()!=null){
+ ObjectNode index = JsonNodeFactory.instance.objectNode();
+ index.put("name",idef.getIndex().name());
+ index.put("columns",arrayNode(idef.getIndex().columns()));
+ idn.put("index",index);
+ }
+
+ indexDefArray.add(idn);
+ }
+ node.put("indexs",indexDefArray);
+ }
+ return node;
+ }
+
+ private ArrayNode arrayNode(String[] values){
+ ArrayNode an = JsonNodeFactory.instance.arrayNode();
+ for(String v:values){
+ an.add(v);
+ }
+ return an;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java
new file mode 100644
index 0000000..f3ff420
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/metric/EagleMetricResource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.metric;
+
+import java.util.List;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.entity.GenericCreateAPIResponseEntity;
+import org.apache.eagle.log.entity.GenericEntityWriter;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+@Path(EagleMetricResource.METRIC_URL_PATH)
+public class EagleMetricResource {
+ private static final Logger LOG = LoggerFactory.getLogger(EagleMetricResource.class);
+ public static final String METRIC_URL_PATH = "/metric";
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericCreateAPIResponseEntity createGenericMetricEntity(List<GenericMetricEntity> entities) {
+ GenericCreateAPIResponseEntity result = new GenericCreateAPIResponseEntity();
+ try{
+ GenericEntityWriter writer = new GenericEntityWriter(GenericMetricEntity.GENERIC_METRIC_SERVICE);
+ List<String> rowkeys = null;
+ rowkeys = writer.write(entities);
+ result.setEncodedRowkeys(rowkeys);
+ result.setSuccess(true);
+ return result;
+ }catch(Exception ex){
+ LOG.error("Fail writing Generic Metric entity", ex);
+ result.setSuccess(false);
+ result.setException(ex.getMessage());
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java
new file mode 100644
index 0000000..9480695
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowKeyQueryResource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.service.rowkey;
+
+import java.io.IOException;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.NoSuchRowException;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.RowkeyQueryAPIResponseEntity;
+import org.apache.eagle.log.entity.index.RowKeyLogReader;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.common.EagleBase64Wrapper;
+
+/**
+ * @since Jan 26, 2015
+ */
+@Path("/rowkeyquery")
+public class RowKeyQueryResource {
+ private static final Logger LOG = LoggerFactory.getLogger(RowKeyQueryResource.class);
+
+ @GET
+ @Produces({MediaType.APPLICATION_JSON})
+ public RowkeyQueryAPIResponseEntity getEntityByRowkey(@QueryParam("query") String query, @QueryParam("rowkey") String rowkey){
+ RowkeyQueryAPIResponseEntity result = new RowkeyQueryAPIResponseEntity();
+ RowKeyLogReader reader = null;
+
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(query);
+ reader = new RowKeyLogReader(ed, EagleBase64Wrapper.decode(rowkey));
+ reader.open();
+ InternalLog log = reader.read();
+ TaggedLogAPIEntity entity;
+ entity = HBaseInternalLogHelper.buildEntity(log, ed);
+ result.setObj(entity);
+ result.setSuccess(true);
+ return result;
+ }
+ catch(NoSuchRowException ex){
+ LOG.error("rowkey " + ex.getMessage() + " does not exist!", ex);
+ result.setSuccess(false);
+ result.setException(EagleExceptionWrapper.wrap(ex));
+ return result;
+ }
+ catch(Exception ex){
+ LOG.error("Cannot read alert by rowkey", ex);
+ result.setSuccess(false);
+ result.setException(EagleExceptionWrapper.wrap(ex));
+ return result;
+ }
+ finally{
+ try {
+ if(reader != null)
+ reader.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java
new file mode 100644
index 0000000..d4a6a42
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/rowkey/RowkeyResource.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.rowkey;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.RowkeyAPIEntity;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.old.GenericDeleter;
+import org.apache.eagle.log.entity.old.HBaseLogByRowkeyReader;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.eagle.common.service.POSTResultEntityBase;
+
+@Deprecated
+@Path("rowkey")
+public class RowkeyResource {
+ private static final Logger LOG = LoggerFactory.getLogger(RowkeyResource.class);
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public RowkeyAPIEntity inspectRowkey(@QueryParam("table") String table, @QueryParam("cf") String columnFamily,
+ @QueryParam("key") String key, @QueryParam("all") String all, @QueryParam("field") List<String> fields){
+ RowkeyAPIEntity entity = new RowkeyAPIEntity();
+ byte[] row = null;
+ boolean includingAllQualifiers = false;
+ if(all != null && all.equals("true"))
+ includingAllQualifiers = true;
+ HBaseLogByRowkeyReader getter = new HBaseLogByRowkeyReader(table, columnFamily, includingAllQualifiers, fields);
+ InternalLog log = null;
+ try{
+ getter.open();
+ row = EagleBase64Wrapper.decode(key);
+ log = getter.get(row);
+ }catch(Exception ex){
+ LOG.error("Cannot get rowkey", ex);
+ entity.setSuccess(false);
+ entity.setException(EagleExceptionWrapper.wrap(ex));
+ return entity;
+ }finally{
+ try{
+ getter.close();
+ }catch(Exception ex){}
+ }
+
+ Map<String, String> fieldNameValueMap = new TreeMap<String, String>();
+ entity.setFieldNameValueMap(fieldNameValueMap);
+ // populate qualifiers
+ Map<String, byte[]> qualifierValues = log.getQualifierValues();
+ for(Map.Entry<String, byte[]> qualifier : qualifierValues.entrySet()){
+ if(qualifier.getValue() != null){
+ fieldNameValueMap.put(qualifier.getKey(), new String(qualifier.getValue()));
+ }
+ }
+
+ // decode rowkey
+ // the first integer is prefix hashcode
+ entity.setPrefixHashCode(ByteUtil.bytesToInt(row, 0));
+ long ts = Long.MAX_VALUE-ByteUtil.bytesToLong(row, 4);
+ entity.setTimestamp(ts);
+ entity.setHumanTime(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(ts));
+ int offset = 4+8;
+ int len = row.length;
+ Map<Integer, Integer> tagNameHashValueHashMap = new HashMap<Integer, Integer>();
+ // TODO boundary check please
+ while(offset < len){
+ int tagNameHash = ByteUtil.bytesToInt(row, offset);
+ offset += 4;
+ int tagValueHash = ByteUtil.bytesToInt(row, offset);
+ offset += 4;
+ tagNameHashValueHashMap.put(tagNameHash, tagValueHash);
+ }
+
+ entity.setSuccess(true);
+ return entity;
+ }
+
+ /**
+ * for entities, the only required field is encodedRowkey
+ * @param table
+ * @param columnFamily
+ * @param entities
+ */
+ @DELETE
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public POSTResultEntityBase deleteEntityByEncodedRowkey(@QueryParam("table") String table, @QueryParam("cf") String columnFamily,
+ List<TaggedLogAPIEntity> entities){
+ GenericDeleter deleter = new GenericDeleter(table, columnFamily);
+ POSTResultEntityBase result = new POSTResultEntityBase();
+ try{
+ deleter.delete(entities);
+ }catch(Exception ex){
+ LOG.error("Fail deleting entity " + table + ":" + columnFamily, ex);
+ result.setSuccess(false);
+ result.setException(ex.getMessage());
+ return result;
+ }
+ result.setSuccess(true);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java
new file mode 100644
index 0000000..0e52a2e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/selfcheck/EagleServiceSelfCheckAPIEntity.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.selfcheck;
+
+/**
+ * expose internal configuration or metrics
+ */
+//@XmlRootElement
+//@XmlAccessorType(XmlAccessType.FIELD)
+//@XmlType(propOrder = {"env", "hbaseZookeeperQuorum", "hbaseZookeeperClientPort"})
+public class EagleServiceSelfCheckAPIEntity {
+ private String env;
+ private String hbaseZookeeperQuorum;
+ private String hbaseZookeeperClientPort;
+ public String getEnv() {
+ return env;
+ }
+ public void setEnv(String env) {
+ this.env = env;
+ }
+ public String getHbaseZookeeperQuorum() {
+ return hbaseZookeeperQuorum;
+ }
+ public void setHbaseZookeeperQuorum(String hbaseZookeeperQuorum) {
+ this.hbaseZookeeperQuorum = hbaseZookeeperQuorum;
+ }
+ public String getHbaseZookeeperClientPort() {
+ return hbaseZookeeperClientPort;
+ }
+ public void setHbaseZookeeperClientPort(String hbaseZookeeperClientPort) {
+ this.hbaseZookeeperClientPort = hbaseZookeeperClientPort;
+ }
+}