You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2018/04/09 12:28:06 UTC

[GitHub] carbondata pull request #2148: [CARBONDATA-2323][WIP] Distributed search mod...

GitHub user jackylk opened a pull request:

    https://github.com/apache/carbondata/pull/2148

    [CARBONDATA-2323][WIP] Distributed search mode using gRPC

    When user gives SQL statement that only includes projection and filter, we can use RPC calls to do distributed scan on the carbon files directly instead of using RDD to do the query. In this mode, RDD overhead like RDD construction and DAG scheduling is avoided.
    
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jackylk/incubator-carbondata carbonstore-rpc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2148.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2148
    
----
commit 572cb918976ff110d16ed930264e54df620a302e
Author: Jacky Li <ja...@...>
Date:   2018-04-01T08:30:22Z

    support CarbonStore API
    
    fix style
    
    fix test

commit cbc8223ce276468b577212325015672e3a2472ad
Author: Jacky Li <ja...@...>
Date:   2018-04-09T12:03:00Z

    support gRPC search mode

----


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3873/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4354/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5093/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182779690
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -31,35 +31,53 @@
     import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
     import org.apache.carbondata.core.util.CarbonProperties;
     
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
    +
     /**
      * Below class will be used to execute the detail query and returns columnar vectors.
      */
     public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
    -  private static ExecutorService executorService;
    +  private static ExecutorService executorService = null;
     
       static {
    +    initThreadPool();
    +  }
    +
    +  private static synchronized void initThreadPool() {
         int nThread;
         try {
           nThread = Integer.parseInt(CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
    +              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                           CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
         } catch (NumberFormatException e) {
           nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
    -      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
    +      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
    +          + "Using the default value " + nThread);
         }
         if (nThread > 0) {
    -      executorService =  Executors.newFixedThreadPool(nThread);
    +      executorService = Executors.newFixedThreadPool(nThread);
    --- End diff --
    
    But CarbonThreadFactory only creates Thread but not ThreadPool, how can I use it?


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5038/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181624234
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.protobuf.ByteString;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +class SearchRequestHandler implements Runnable {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +  private boolean running = true;
    +  private Queue<SearchService.SearchRequestContext> requestQueue;
    +
    +  SearchRequestHandler(Queue<SearchService.SearchRequestContext> requestQueue) {
    +    this.requestQueue = requestQueue;
    +  }
    +
    +  public void run() {
    +    while (running) {
    +      SearchService.SearchRequestContext requestContext = requestQueue.poll();
    +      if (requestContext == null) {
    +        try {
    +          Thread.sleep(10);
    +        } catch (InterruptedException e) {
    +          LOG.error(e);
    +        }
    +      } else {
    +        try {
    +          List<CarbonRow> rows = handleRequest(requestContext);
    +          sendSuccessResponse(requestContext, rows);
    +        } catch (IOException | InterruptedException e) {
    +          LOG.error(e);
    +          sendFailureResponse(requestContext, e);
    +        }
    +      }
    +    }
    +  }
    +
    +  public void stop() {
    +    running = false;
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchService.SearchRequestContext requestContext)
    +      throws IOException, InterruptedException {
    +    SearchRequest request = requestContext.getRequest();
    +    TableInfo tableInfo = GrpcSerdes.deserialize(request.getTableInfo());
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +
    +    // the request contains CarbonMultiBlockSplit and reader will read multiple blocks
    +    // by using a thread pool
    +    CarbonMultiBlockSplit mbSplit = getMultiBlockSplit(request);
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    --- End diff --
    
    I am adding an option in hadoop configuration so that Master will set it before doing getSplit


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4452/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3859/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4353/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182684714
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,14 +93,14 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    -        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
    +        rows.add(reader.getCurrentValue());
           }
         } catch (InterruptedException e) {
           throw new IOException(e);
         } finally {
           reader.close();
         }
    +    LOG.error("finished reading");
    --- End diff --
    
    Why the log is ERROR level?


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5042/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3776/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4071/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5037/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3821/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5011/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632923
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/master/Master.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.master;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.block.Distributable;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.exception.InvalidConfigurationException;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +import org.apache.carbondata.store.protocol.EchoRequest;
    +import org.apache.carbondata.store.protocol.EchoResponse;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.protocol.ShutdownRequest;
    +import org.apache.carbondata.store.protocol.ShutdownResponse;
    +import org.apache.carbondata.store.protocol.WorkerGrpc;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.protobuf.ByteString;
    +import io.grpc.ManagedChannel;
    +import io.grpc.ManagedChannelBuilder;
    +import io.grpc.Server;
    +import io.grpc.ServerBuilder;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +/**
    + * Master of CarbonSearch.
    + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to register.
    + * And it provides search API to fire RPC call to workers.
    + */
    +@InterfaceAudience.Internal
    +public class Master {
    +
    +  private static final LogService LOG = LogServiceFactory.getLogService(Master.class.getName());
    +
    +  public static final int DEFAULT_PORT = 10020;
    +
    +  private Server registryServer;
    +
    +  private int port;
    +
    +  private Random random = new Random();
    +
    +  /** mapping of worker hostname to rpc stub */
    +  private Map<String, WorkerGrpc.WorkerFutureStub> workers;
    +
    +  public Master() {
    +    this(DEFAULT_PORT);
    +  }
    +
    +  public Master(int port) {
    +    this.port = port;
    +    this.workers = new ConcurrentHashMap<>();
    +  }
    +
    +  /** start service and listen on port passed in constructor */
    +  public void startService() throws IOException {
    +    if (registryServer == null) {
    +      /* The port on which the registryServer should run */
    +      registryServer = ServerBuilder.forPort(port)
    +          .addService(new RegistryService(this))
    +          .build()
    +          .start();
    +      LOG.info("Master started, listening on " + port);
    +      Runtime.getRuntime().addShutdownHook(new Thread() {
    +        @Override public void run() {
    +          // Use stderr here since the logger may have been reset by its JVM shutdown hook.
    +          LOG.info("*** shutting down gRPC Master since JVM is shutting down");
    +          stopService();
    +          LOG.info("*** Master shut down");
    +        }
    +      });
    +    }
    +  }
    +
    +  public void stopService() {
    +    if (registryServer != null) {
    +      registryServer.shutdown();
    +    }
    +  }
    +
    +  public void stopAllWorkers() throws IOException, ExecutionException, InterruptedException {
    +    ShutdownRequest request = ShutdownRequest.newBuilder()
    +        .setTrigger(ShutdownRequest.Trigger.USER)
    +        .build();
    +    for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : workers.entrySet()) {
    +      ListenableFuture<ShutdownResponse> future = worker.getValue().shutdown(request);
    +      ShutdownResponse response = future.get();
    +      if (response.getStatus() != ShutdownResponse.Status.SUCCESS) {
    +        LOG.error("failed to shutdown worker: " + response.getMessage());
    +        throw new IOException(response.getMessage());
    +      } else {
    +        workers.remove(worker.getKey());
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Await termination on the main thread since the grpc library uses daemon threads.
    +   */
    +  private void blockUntilShutdown() throws InterruptedException {
    +    if (registryServer != null) {
    +      registryServer.awaitTermination();
    +    }
    +  }
    +
    +  /** A new searcher is trying to register, add it to the map and connect to this searcher */
    +  void addWorker(String workerHostname, int port, int cores)
    +      throws ExecutionException, InterruptedException {
    +    Objects.requireNonNull(workerHostname);
    +
    +    LOG.info("trying to connect to searcher " + workerHostname + ":" + port);
    +    ManagedChannel channelToWorker = ManagedChannelBuilder.forAddress(workerHostname, port)
    +        .usePlaintext(true)
    +        .maxInboundMessageSize(200 * 1000 * 1000)
    +        .build();
    +    WorkerGrpc.WorkerFutureStub futureStub = WorkerGrpc.newFutureStub(channelToWorker);
    +
    +    // try to send a message to worker as a test
    +    tryEcho(futureStub);
    +    workers.put(workerHostname, futureStub);
    +  }
    +
    +  private void tryEcho(WorkerGrpc.WorkerFutureStub stub)
    +      throws ExecutionException, InterruptedException {
    +    EchoRequest request = EchoRequest.newBuilder().setMessage("hello").build();
    +    LOG.info("echo to searcher: " + request.getMessage());
    +    ListenableFuture<EchoResponse> response = stub.echo(request);
    +    try {
    +      LOG.info("echo from searcher: " + response.get().getMessage());
    +    } catch (InterruptedException | ExecutionException e) {
    +      LOG.error("failed to echo: " + e.getMessage());
    +      throw e;
    +    }
    +  }
    +
    +  /**
    +   * Execute search by firing RPC call to worker, return the result rows
    +   */
    +  public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter)
    +      throws IOException, InvalidConfigurationException, ExecutionException, InterruptedException {
    +    Objects.requireNonNull(table);
    +    Objects.requireNonNull(columns);
    +
    +    if (workers.size() == 0) {
    +      throw new IOException("No searcher is available");
    +    }
    +
    +    int queryId = random.nextInt();
    +
    +    // Build a SearchRequest
    +    SearchRequest.Builder builder = SearchRequest.newBuilder()
    +        .setQueryId(queryId)
    +        .setTableInfo(GrpcSerdes.serialize(table.getTableInfo()));
    +    for (String column : columns) {
    +      builder.addProjectColumns(column);
    +    }
    +    if (filter != null) {
    +      builder.setFilterExpression(GrpcSerdes.serialize(filter));
    +    }
    +
    +    // prune data and get a mapping of worker hostname to list of blocks,
    +    // add these blocks to the SearchRequest and fire the RPC call
    +    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter);
    +
    +    List<ListenableFuture<SearchResult>> futures = new ArrayList<>(nodeBlockMapping.size());
    +
    +    for (Map.Entry<String, List<Distributable>> entry : nodeBlockMapping.entrySet()) {
    +      String hostname = entry.getKey();
    +      List<Distributable> blocks = entry.getValue();
    +      CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, hostname);
    +      ByteArrayOutputStream stream = new ByteArrayOutputStream();
    +      DataOutput dataOutput = new DataOutputStream(stream);
    +      mbSplit.write(dataOutput);
    +      builder.setSplits(ByteString.copyFrom(stream.toByteArray()));
    +
    +      SearchRequest request = builder.build();
    +
    +      // do RPC to worker asynchronously and concurrently
    +      ListenableFuture<SearchResult> future = workers.get(hostname).search(request);
    +      futures.add(future);
    +    }
    +
    +    // get all results from RPC response and return to caller
    +    List<CarbonRow> output = new LinkedList<>();
    +    for (ListenableFuture<SearchResult> future : futures) {
    +      SearchResult result = future.get();
    +      if (result.getQueryId() != queryId) {
    +        throw new IOException(String.format(
    +            "queryId in response does not match request: %d != %d", result.getQueryId(), queryId));
    +      }
    +      collectResult(result, output);
    +    }
    +    return output.toArray(new CarbonRow[output.size()]);
    +  }
    +
    +  /**
    +   * Prune data by using CarbonInputFormat.getSplit
    +   * Return a mapping of hostname to list of block
    +   */
    +  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns,
    +      Expression filter) throws IOException, InvalidConfigurationException {
    +    JobConf jobConf = new JobConf(new Configuration());
    +    Job job = new Job(jobConf);
    +    CarbonTableInputFormat<Object> format = CarbonInputFormatUtil.createCarbonTableInputFormat(
    +        job, table, columns, filter, null, null);
    +
    +    List<InputSplit> splits = format.getSplits(job);
    +    List<Distributable> distributables = new ArrayList<>(splits.size());
    +    for (InputSplit split : splits) {
    +      distributables.add(((CarbonInputSplit)split));
    +    }
    +    return CarbonLoaderUtil.nodeBlockMapping(
    +        distributables, -1, new ArrayList<String>(workers.keySet()),
    +        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
    +  }
    +
    +  /**
    +   * Fill result row to {@param output}
    +   */
    +  private void collectResult(SearchResult result,  List<CarbonRow> output) throws IOException {
    +    for (ByteString bytes : result.getRowList()) {
    +      CarbonRow row = GrpcSerdes.deserialize(bytes);
    +      output.add(row);
    +    }
    +  }
    +
    +  /** return hostname of all workers */
    +  public Set<String> getWorkers() {
    +    return workers.keySet();
    +  }
    +
    +  public static void main(String[] args) throws IOException, InterruptedException {
    --- End diff --
    
    yes


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3661/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4058/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4978/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183063006
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -31,35 +31,53 @@
     import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
     import org.apache.carbondata.core.util.CarbonProperties;
     
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
    +
     /**
      * Below class will be used to execute the detail query and returns columnar vectors.
      */
     public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
    -  private static ExecutorService executorService;
    +  private static ExecutorService executorService = null;
     
       static {
    +    initThreadPool();
    +  }
    +
    +  private static synchronized void initThreadPool() {
         int nThread;
         try {
           nThread = Integer.parseInt(CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
    +              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                           CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
         } catch (NumberFormatException e) {
           nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
    -      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
    +      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
    +          + "Using the default value " + nThread);
         }
         if (nThread > 0) {
    -      executorService =  Executors.newFixedThreadPool(nThread);
    +      executorService = Executors.newFixedThreadPool(nThread);
    --- End diff --
    
    yes in Thread pool you can pass thread factory object and assign pool name 
        ExecutorService executorService = Executors.newFixedThreadPool(1,
            new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3824/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181586050
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -56,6 +59,11 @@
         }
       }
     
    +  public static void shutdown() {
    --- End diff --
    
    I don't find the caller method of this method.


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182620251
  
    --- Diff: dev/findbugs-exclude.xml ---
    @@ -98,4 +102,9 @@
       <Match>
         <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
       </Match>
    +
    +  <Match>
    +    <!-- gRPC generated code in search module -->
    +    <Package name="org.apache.carbondata.store.protocol"/>
    +  </Match>
    --- End diff --
    
    I think this changes are not required


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    retest this please


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3918/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5153/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182620157
  
    --- Diff: common/pom.xml ---
    @@ -52,6 +52,24 @@
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-common</artifactId>
    +      <exclusions>
    +        <exclusion>
    +          <groupId>io.netty</groupId>
    +          <artifactId>netty</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>io.netty</groupId>
    +          <artifactId>netty-all</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.google.protobuf</groupId>
    +          <artifactId>protobuf-java</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.google.guava</groupId>
    +          <artifactId>guava</artifactId>
    +        </exclusion>
    +      </exclusions>
    --- End diff --
    
    Since you are not using grpc these exclusions are required now?


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5254/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5045/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5275/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3924/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2148


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182620461
  
    --- Diff: examples/spark2/pom.xml ---
    @@ -62,6 +62,11 @@
           <version>${spark.version}</version>
           <scope>${spark.deps.scope}</scope>
         </dependency>
    +    <dependency>
    +      <groupId>com.google.guava</groupId>
    +      <artifactId>guava</artifactId>
    +      <version>19.0</version>
    +    </dependency>
    --- End diff --
    
    Is this dependency required? I don't see any special imports in example classes


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181587611
  
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---
    @@ -134,9 +133,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
         queryModel.setTableBlockInfos(tableBlockInfoList);
         queryModel.setVectorReader(true);
         try {
    -      if (CarbonProperties.getInstance().getProperty(
    -              CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
    -              CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) {
    +      if (CarbonProperties.isSearchModeEnabled()) {
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3781/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5267/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5261/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5079/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5282/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182689498
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -365,9 +365,10 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    -        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    -        dataMapJob != null) {
    +    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
    +    if (dataMapJob != null &&
    +        distributedCG ||
    --- End diff --
    
    The logical was changed, should be 
    
    `  if (dataMapJob != null &&
                (distributedCG ||
            isFgDataMapPruningEnable(job.getConfiguration()) && dataMapLevel == DataMapLevel.FG)) `


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3907/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5146/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4360/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3903/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4997/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183005257
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---
    @@ -19,6 +19,7 @@ package org.apache.carbondata.examples.util
     
     import java.io.File
     
    +import org.apache.spark.SparkConf
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182636062
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -354,7 +365,9 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
    +    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    +        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    +        dataMapJob != null) {
    --- End diff --
    
    But I guess we should prune with blockletdatamap once if fg is disabled.


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3811/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181587500
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -56,6 +59,11 @@
         }
       }
     
    +  public static void shutdown() {
    --- End diff --
    
    This is called in Worker.shutdown


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4886/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3809/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181589099
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.protobuf.ByteString;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +class SearchRequestHandler implements Runnable {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +  private boolean running = true;
    +  private Queue<SearchService.SearchRequestContext> requestQueue;
    +
    +  SearchRequestHandler(Queue<SearchService.SearchRequestContext> requestQueue) {
    +    this.requestQueue = requestQueue;
    +  }
    +
    +  public void run() {
    +    while (running) {
    +      SearchService.SearchRequestContext requestContext = requestQueue.poll();
    +      if (requestContext == null) {
    +        try {
    +          Thread.sleep(10);
    +        } catch (InterruptedException e) {
    +          LOG.error(e);
    +        }
    +      } else {
    +        try {
    +          List<CarbonRow> rows = handleRequest(requestContext);
    +          sendSuccessResponse(requestContext, rows);
    +        } catch (IOException | InterruptedException e) {
    +          LOG.error(e);
    +          sendFailureResponse(requestContext, e);
    +        }
    +      }
    +    }
    +  }
    +
    +  public void stop() {
    +    running = false;
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchService.SearchRequestContext requestContext)
    +      throws IOException, InterruptedException {
    +    SearchRequest request = requestContext.getRequest();
    +    TableInfo tableInfo = GrpcSerdes.deserialize(request.getTableInfo());
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +
    +    // the request contains CarbonMultiBlockSplit and reader will read multiple blocks
    +    // by using a thread pool
    +    CarbonMultiBlockSplit mbSplit = getMultiBlockSplit(request);
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    --- End diff --
    
    How did you avoid pruning fro driver side? Please make sure that it does not prune twice


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3669/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3921/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5103/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632817
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.carbondata.spark.rdd
     
     import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +import java.net.InetAddress
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4080/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181630182
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +import java.util.concurrent.{Executors, ExecutorService}
    +
    +import org.apache.spark.sql.{CarbonSession, SparkSession}
    +
    +import org.apache.carbondata.examples.util.ExampleUtils
    +
    +/**
    + * An example that demonstrate how to run queries in search mode,
    + * and compare the performance between search mode and SparkSQL
    + */
    +// scalastyle:off
    +object SearchModeExample {
    +
    +  def main(args: Array[String]) {
    +    val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
    --- End diff --
    
    Please change the app name


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181612979
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.carbondata.spark.rdd
     
     import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +import java.net.InetAddress
    --- End diff --
    
    Unused import, please remove


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3913/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182777721
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,14 +93,14 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    -        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
    +        rows.add(reader.getCurrentValue());
           }
         } catch (InterruptedException e) {
           throw new IOException(e);
         } finally {
           reader.close();
         }
    +    LOG.error("finished reading");
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4356/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5248/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5124/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3663/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4882/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4430/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3858/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181620436
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/master/Master.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.master;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.block.Distributable;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.exception.InvalidConfigurationException;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +import org.apache.carbondata.store.protocol.EchoRequest;
    +import org.apache.carbondata.store.protocol.EchoResponse;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.protocol.ShutdownRequest;
    +import org.apache.carbondata.store.protocol.ShutdownResponse;
    +import org.apache.carbondata.store.protocol.WorkerGrpc;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.protobuf.ByteString;
    +import io.grpc.ManagedChannel;
    +import io.grpc.ManagedChannelBuilder;
    +import io.grpc.Server;
    +import io.grpc.ServerBuilder;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +/**
    + * Master of CarbonSearch.
    + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to register.
    + * And it provides search API to fire RPC call to workers.
    + */
    +@InterfaceAudience.Internal
    +public class Master {
    +
    +  private static final LogService LOG = LogServiceFactory.getLogService(Master.class.getName());
    +
    +  public static final int DEFAULT_PORT = 10020;
    +
    +  private Server registryServer;
    +
    +  private int port;
    +
    +  private Random random = new Random();
    +
    +  /** mapping of worker hostname to rpc stub */
    +  private Map<String, WorkerGrpc.WorkerFutureStub> workers;
    +
    +  public Master() {
    +    this(DEFAULT_PORT);
    +  }
    +
    +  public Master(int port) {
    +    this.port = port;
    +    this.workers = new ConcurrentHashMap<>();
    +  }
    +
    +  /** start service and listen on port passed in constructor */
    +  public void startService() throws IOException {
    +    if (registryServer == null) {
    +      /* The port on which the registryServer should run */
    +      registryServer = ServerBuilder.forPort(port)
    +          .addService(new RegistryService(this))
    +          .build()
    +          .start();
    +      LOG.info("Master started, listening on " + port);
    +      Runtime.getRuntime().addShutdownHook(new Thread() {
    +        @Override public void run() {
    +          // Use stderr here since the logger may have been reset by its JVM shutdown hook.
    +          LOG.info("*** shutting down gRPC Master since JVM is shutting down");
    +          stopService();
    +          LOG.info("*** Master shut down");
    +        }
    +      });
    +    }
    +  }
    +
    +  public void stopService() {
    +    if (registryServer != null) {
    +      registryServer.shutdown();
    +    }
    +  }
    +
    +  public void stopAllWorkers() throws IOException, ExecutionException, InterruptedException {
    +    ShutdownRequest request = ShutdownRequest.newBuilder()
    +        .setTrigger(ShutdownRequest.Trigger.USER)
    +        .build();
    +    for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : workers.entrySet()) {
    +      ListenableFuture<ShutdownResponse> future = worker.getValue().shutdown(request);
    +      ShutdownResponse response = future.get();
    +      if (response.getStatus() != ShutdownResponse.Status.SUCCESS) {
    +        LOG.error("failed to shutdown worker: " + response.getMessage());
    +        throw new IOException(response.getMessage());
    +      } else {
    +        workers.remove(worker.getKey());
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Await termination on the main thread since the grpc library uses daemon threads.
    +   */
    +  private void blockUntilShutdown() throws InterruptedException {
    +    if (registryServer != null) {
    +      registryServer.awaitTermination();
    +    }
    +  }
    +
    +  /** A new searcher is trying to register, add it to the map and connect to this searcher */
    +  void addWorker(String workerHostname, int port, int cores)
    +      throws ExecutionException, InterruptedException {
    +    Objects.requireNonNull(workerHostname);
    +
    +    LOG.info("trying to connect to searcher " + workerHostname + ":" + port);
    +    ManagedChannel channelToWorker = ManagedChannelBuilder.forAddress(workerHostname, port)
    +        .usePlaintext(true)
    +        .maxInboundMessageSize(200 * 1000 * 1000)
    +        .build();
    +    WorkerGrpc.WorkerFutureStub futureStub = WorkerGrpc.newFutureStub(channelToWorker);
    +
    +    // try to send a message to worker as a test
    +    tryEcho(futureStub);
    +    workers.put(workerHostname, futureStub);
    +  }
    +
    +  private void tryEcho(WorkerGrpc.WorkerFutureStub stub)
    +      throws ExecutionException, InterruptedException {
    +    EchoRequest request = EchoRequest.newBuilder().setMessage("hello").build();
    +    LOG.info("echo to searcher: " + request.getMessage());
    +    ListenableFuture<EchoResponse> response = stub.echo(request);
    +    try {
    +      LOG.info("echo from searcher: " + response.get().getMessage());
    +    } catch (InterruptedException | ExecutionException e) {
    +      LOG.error("failed to echo: " + e.getMessage());
    +      throw e;
    +    }
    +  }
    +
    +  /**
    +   * Execute search by firing RPC call to worker, return the result rows
    +   */
    +  public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter)
    +      throws IOException, InvalidConfigurationException, ExecutionException, InterruptedException {
    +    Objects.requireNonNull(table);
    +    Objects.requireNonNull(columns);
    +
    +    if (workers.size() == 0) {
    +      throw new IOException("No searcher is available");
    +    }
    +
    +    int queryId = random.nextInt();
    +
    +    // Build a SearchRequest
    +    SearchRequest.Builder builder = SearchRequest.newBuilder()
    +        .setQueryId(queryId)
    +        .setTableInfo(GrpcSerdes.serialize(table.getTableInfo()));
    +    for (String column : columns) {
    +      builder.addProjectColumns(column);
    +    }
    +    if (filter != null) {
    +      builder.setFilterExpression(GrpcSerdes.serialize(filter));
    +    }
    +
    +    // prune data and get a mapping of worker hostname to list of blocks,
    +    // add these blocks to the SearchRequest and fire the RPC call
    +    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter);
    +
    +    List<ListenableFuture<SearchResult>> futures = new ArrayList<>(nodeBlockMapping.size());
    +
    +    for (Map.Entry<String, List<Distributable>> entry : nodeBlockMapping.entrySet()) {
    +      String hostname = entry.getKey();
    +      List<Distributable> blocks = entry.getValue();
    +      CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, hostname);
    +      ByteArrayOutputStream stream = new ByteArrayOutputStream();
    +      DataOutput dataOutput = new DataOutputStream(stream);
    +      mbSplit.write(dataOutput);
    +      builder.setSplits(ByteString.copyFrom(stream.toByteArray()));
    +
    +      SearchRequest request = builder.build();
    +
    +      // do RPC to worker asynchronously and concurrently
    +      ListenableFuture<SearchResult> future = workers.get(hostname).search(request);
    +      futures.add(future);
    +    }
    +
    +    // get all results from RPC response and return to caller
    +    List<CarbonRow> output = new LinkedList<>();
    +    for (ListenableFuture<SearchResult> future : futures) {
    +      SearchResult result = future.get();
    +      if (result.getQueryId() != queryId) {
    +        throw new IOException(String.format(
    +            "queryId in response does not match request: %d != %d", result.getQueryId(), queryId));
    +      }
    +      collectResult(result, output);
    +    }
    +    return output.toArray(new CarbonRow[output.size()]);
    +  }
    +
    +  /**
    +   * Prune data by using CarbonInputFormat.getSplit
    +   * Return a mapping of hostname to list of block
    +   */
    +  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns,
    +      Expression filter) throws IOException, InvalidConfigurationException {
    +    JobConf jobConf = new JobConf(new Configuration());
    +    Job job = new Job(jobConf);
    +    CarbonTableInputFormat<Object> format = CarbonInputFormatUtil.createCarbonTableInputFormat(
    +        job, table, columns, filter, null, null);
    +
    +    List<InputSplit> splits = format.getSplits(job);
    +    List<Distributable> distributables = new ArrayList<>(splits.size());
    +    for (InputSplit split : splits) {
    +      distributables.add(((CarbonInputSplit)split));
    +    }
    +    return CarbonLoaderUtil.nodeBlockMapping(
    +        distributables, -1, new ArrayList<String>(workers.keySet()),
    +        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
    +  }
    +
    +  /**
    +   * Fill result row to {@param output}
    +   */
    +  private void collectResult(SearchResult result,  List<CarbonRow> output) throws IOException {
    +    for (ByteString bytes : result.getRowList()) {
    +      CarbonRow row = GrpcSerdes.deserialize(bytes);
    +      output.add(row);
    +    }
    +  }
    +
    +  /** return hostname of all workers */
    +  public Set<String> getWorkers() {
    +    return workers.keySet();
    +  }
    +
    +  public static void main(String[] args) throws IOException, InterruptedException {
    --- End diff --
    
    what is the use of main here? is it for testing ?


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181588502
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---
    @@ -68,19 +78,16 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
           filter: Expression): java.util.Iterator[CarbonRow] = {
         require(path != null)
         require(projectColumns != null)
    -    val table = getTable(path)
    -    val rdd = new CarbonScanRDD[CarbonRow](
    -      spark = session,
    -      columnProjection = new CarbonProjection(projectColumns),
    -      filterExpression = filter,
    -      identifier = table.getAbsoluteTableIdentifier,
    -      serializedTableInfo = table.getTableInfo.serialize,
    -      tableInfo = table.getTableInfo,
    -      inputMetricsStats = new CarbonInputMetrics,
    -      partitionNames = null,
    -      dataTypeConverterClz = null,
    -      readSupportClz = classOf[CarbonRowReadSupport])
    -    rdd.collect
    +    scan(getTable(path), projectColumns, filter)
    +  }
    +
    +  def scan(
    +      carbonTable: CarbonTable,
    +      projectColumns: Array[String],
    --- End diff --
    
    why not use `QueryProjection` instead of array


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5139/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    @ravipesala No, it is not possible in this version now. In next version, we need to change it.
    In spark 2.1 and 2.2, spark is using netty 4.0.x, and gRPC is using 4.1.x, and they are not compatible. In spark 2.3 spark have upgraded to 4.1.x. So we can use gRPC once we support spark 2.3, then we do not dependent on spark EndPoint framework.


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182621383
  
    --- Diff: common/pom.xml ---
    @@ -52,6 +52,24 @@
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-common</artifactId>
    +      <exclusions>
    +        <exclusion>
    +          <groupId>io.netty</groupId>
    +          <artifactId>netty</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>io.netty</groupId>
    +          <artifactId>netty-all</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.google.protobuf</groupId>
    +          <artifactId>protobuf-java</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.google.guava</groupId>
    +          <artifactId>guava</artifactId>
    +        </exclusion>
    +      </exclusions>
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182778652
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -365,9 +365,10 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    -        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    -        dataMapJob != null) {
    +    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
    +    if (dataMapJob != null &&
    +        distributedCG ||
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181613087
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -59,6 +60,8 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
     import org.apache.carbondata.processing.util.CarbonLoaderUtil
     import org.apache.carbondata.spark.InitInputMetrics
     import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
    +import org.apache.carbondata.store.master.Master
    +import org.apache.carbondata.store.worker.Worker
    --- End diff --
    
    Above all newly added imports are unused, please remove


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182643919
  
    --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.rpc
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.ExecutionContext.Implicits.global
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5043/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182643983
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -354,7 +365,9 @@ protected Expression getFilterPredicates(Configuration configuration) {
         DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
         List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
         List<ExtendedBlocklet> prunedBlocklets;
    -    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
    +    if (isFgDataMapPruningEnable(job.getConfiguration()) &&
    +        (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) &&
    +        dataMapJob != null) {
    --- End diff --
    
    I changed the condition again, please check


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183071058
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +
    +import org.apache.spark.search.SearchRequest;
    +import org.apache.spark.search.SearchResult;
    +import org.apache.spark.search.ShutdownRequest;
    +import org.apache.spark.search.ShutdownResponse;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +public class SearchRequestHandler {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +
    +  public SearchResult handleSearch(SearchRequest request) {
    +    try {
    +      List<CarbonRow> rows = handleRequest(request);
    +      return createSuccessResponse(request, rows);
    +    } catch (IOException | InterruptedException e) {
    +      LOG.error(e);
    +      return createFailureResponse(request, e);
    +    }
    +  }
    +
    +  public ShutdownResponse handleShutdown(ShutdownRequest request) {
    +    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchRequest request)
    +      throws IOException, InterruptedException {
    +    TableInfo tableInfo = request.tableInfo();
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +    CarbonMultiBlockSplit mbSplit = request.split().value();
    +    long limit = request.limit();
    +    long rowCount = 0;
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    +
    +    // In search mode, reader will read multiple blocks by using a thread pool
    +    CarbonRecordReader<CarbonRow> reader =
    +        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
    +    reader.initialize(mbSplit, null);
    +
    +    // read all rows by the reader
    +    List<CarbonRow> rows = new LinkedList<>();
    +    try {
    +      while (reader.nextKeyValue() && rowCount < limit) {
    --- End diff --
    
    We can set default value for limit to -1(for non limit query) and update the code as following
     while (reader.nextKeyValue() && (limit!= -1 || rowCount < limit))
    I think above code will be more readable. 
    



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3829/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181586088
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -56,6 +59,11 @@
         }
       }
     
    +  public static void shutdown() {
    --- End diff --
    
    Better override the finish method to call the executorservice shutdowm


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181587533
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java ---
    @@ -23,51 +23,101 @@
     import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
     import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
     import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.filter.SingleTableProvider;
     import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.DataTypeConverter;
     
     public class QueryModelBuilder {
     
    -  private CarbonTable carbonTable;
    +  private CarbonTable table;
    +  private QueryProjection projection;
    +  private Expression filterExpression;
    +  private DataTypeConverter dataTypeConverter;
    +  private boolean forcedDetailRawQuery;
    +  private boolean readPageByPage;
     
    -  public QueryModelBuilder(CarbonTable carbonTable) {
    -    this.carbonTable = carbonTable;
    +  public QueryModelBuilder(CarbonTable table) {
    +    this.table = table;
       }
     
    -  public QueryModel build(String[] projectionColumnNames, Expression filterExpression) {
    -    QueryModel queryModel = QueryModel.newInstance(carbonTable);
    -    QueryProjection projection = carbonTable.createProjection(projectionColumnNames);
    -    queryModel.setProjection(projection);
    -    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
    -    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
    -    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
    -    queryModel.setIsFilterDimensions(isFilterDimensions);
    -    queryModel.setIsFilterMeasures(isFilterMeasures);
    -    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null);
    -    queryModel.setFilterExpressionResolverTree(filterIntf);
    -    return queryModel;
    +  public QueryModelBuilder projectColumns(String[] projectionColumns) {
    +    String factTableName = table.getTableName();
    +    QueryProjection projection = new QueryProjection();
    +    // fill dimensions
    +    // If columns are null, set all dimensions and measures
    --- End diff --
    
    I think it is better to throw an exception if projections are null. Because it has already another method `projectAllColumns` if the user wants to get all columns.


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by kumarvishal09 <gi...@git.apache.org>.
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182754423
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---
    @@ -31,35 +31,53 @@
     import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
     import org.apache.carbondata.core.util.CarbonProperties;
     
    +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
    +
     /**
      * Below class will be used to execute the detail query and returns columnar vectors.
      */
     public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
       private static final LogService LOGGER =
               LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
    -  private static ExecutorService executorService;
    +  private static ExecutorService executorService = null;
     
       static {
    +    initThreadPool();
    +  }
    +
    +  private static synchronized void initThreadPool() {
         int nThread;
         try {
           nThread = Integer.parseInt(CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
    +              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                           CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
         } catch (NumberFormatException e) {
           nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
    -      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
    +      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
    +          + "Using the default value " + nThread);
         }
         if (nThread > 0) {
    -      executorService =  Executors.newFixedThreadPool(nThread);
    +      executorService = Executors.newFixedThreadPool(nThread);
    --- End diff --
    
    Use CarbonThreadFactory to assign pool name while creating executor service 



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5230/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181617779
  
    --- Diff: pom.xml ---
    @@ -121,6 +122,8 @@
         <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
         <script.exetension>.sh</script.exetension>
         <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
    +    <grpc.version>1.10.0</grpc.version>
    +    <netty.version>4.0.43.Final</netty.version>
    --- End diff --
    
    Better use netty version same version as spark uses. Otherwise lot of exclusions need to be done.


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4442/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4892/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182622389
  
    --- Diff: dev/findbugs-exclude.xml ---
    @@ -98,4 +102,9 @@
       <Match>
         <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
       </Match>
    +
    +  <Match>
    +    <!-- gRPC generated code in search module -->
    +    <Package name="org.apache.carbondata.store.protocol"/>
    +  </Match>
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5134/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183077945
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.worker;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.DataMapChooser;
    +import org.apache.carbondata.core.datamap.DataMapLevel;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.model.QueryModelBuilder;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
    +
    +import org.apache.spark.search.SearchRequest;
    +import org.apache.spark.search.SearchResult;
    +import org.apache.spark.search.ShutdownRequest;
    +import org.apache.spark.search.ShutdownResponse;
    +
    +/**
    + * Thread runnable for handling SearchRequest from master.
    + */
    +@InterfaceAudience.Internal
    +public class SearchRequestHandler {
    +
    +  private static final LogService LOG =
    +      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
    +
    +  public SearchResult handleSearch(SearchRequest request) {
    +    try {
    +      List<CarbonRow> rows = handleRequest(request);
    +      return createSuccessResponse(request, rows);
    +    } catch (IOException | InterruptedException e) {
    +      LOG.error(e);
    +      return createFailureResponse(request, e);
    +    }
    +  }
    +
    +  public ShutdownResponse handleShutdown(ShutdownRequest request) {
    +    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
    +  }
    +
    +  /**
    +   * Builds {@link QueryModel} and read data from files
    +   */
    +  private List<CarbonRow> handleRequest(SearchRequest request)
    +      throws IOException, InterruptedException {
    +    TableInfo tableInfo = request.tableInfo();
    +    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
    +    QueryModel queryModel = createQueryModel(table, request);
    +    CarbonMultiBlockSplit mbSplit = request.split().value();
    +    long limit = request.limit();
    +    long rowCount = 0;
    +
    +    // If there is FGDataMap, prune the split by applying FGDataMap
    +    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
    +
    +    // In search mode, reader will read multiple blocks by using a thread pool
    +    CarbonRecordReader<CarbonRow> reader =
    +        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
    +    reader.initialize(mbSplit, null);
    +
    +    // read all rows by the reader
    +    List<CarbonRow> rows = new LinkedList<>();
    +    try {
    +      while (reader.nextKeyValue() && rowCount < limit) {
    --- End diff --
    
    I think using Long.MaxValue better than using -1, code is unified


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4881/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3795/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182637982
  
    --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.rpc
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.ExecutionContext.Implicits.global
    --- End diff --
    
    Unused import


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3660/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181586769
  
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---
    @@ -134,9 +133,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
         queryModel.setTableBlockInfos(tableBlockInfoList);
         queryModel.setVectorReader(true);
         try {
    -      if (CarbonProperties.getInstance().getProperty(
    -              CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
    -              CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) {
    +      if (CarbonProperties.isSearchModeEnabled()) {
    --- End diff --
    
    I guess no need to do if check here, already `QueryExecutorFactory` available so move the code there.


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182622408
  
    --- Diff: examples/spark2/pom.xml ---
    @@ -62,6 +62,11 @@
           <version>${spark.version}</version>
           <scope>${spark.deps.scope}</scope>
         </dependency>
    +    <dependency>
    +      <groupId>com.google.guava</groupId>
    +      <artifactId>guava</artifactId>
    +      <version>19.0</version>
    +    </dependency>
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181633019
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +import java.util.concurrent.{Executors, ExecutorService}
    +
    +import org.apache.spark.sql.{CarbonSession, SparkSession}
    +
    +import org.apache.carbondata.examples.util.ExampleUtils
    +
    +/**
    + * An example that demonstrate how to run queries in search mode,
    + * and compare the performance between search mode and SparkSQL
    + */
    +// scalastyle:off
    +object SearchModeExample {
    +
    +  def main(args: Array[String]) {
    +    val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    retest this please


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5108/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3929/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    retest this please


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by kevinjmh <gi...@git.apache.org>.
Github user kevinjmh commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182294264
  
    --- Diff: pom.xml ---
    @@ -121,6 +122,8 @@
         <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
         <script.exetension>.sh</script.exetension>
         <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
    +    <grpc.version>1.10.0</grpc.version>
    +    <netty.version>4.0.43.Final</netty.version>
    --- End diff --
    
    I find that even the oldest version of grpc-netty depends on netty-codec-http2 which came out since 4.1.x. 
    
    see  http://mvnrepository.com/artifact/io.grpc/grpc-netty


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    @jackylk But with sparks RPC endpoint framework it is dependent on spark, so is it possible for carbon to have separate cluster without running sparks executors?


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3826/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5081/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5082/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3812/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181589448
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---
    @@ -68,19 +78,16 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
           filter: Expression): java.util.Iterator[CarbonRow] = {
         require(path != null)
         require(projectColumns != null)
    -    val table = getTable(path)
    -    val rdd = new CarbonScanRDD[CarbonRow](
    -      spark = session,
    -      columnProjection = new CarbonProjection(projectColumns),
    -      filterExpression = filter,
    -      identifier = table.getAbsoluteTableIdentifier,
    -      serializedTableInfo = table.getTableInfo.serialize,
    -      tableInfo = table.getTableInfo,
    -      inputMetricsStats = new CarbonInputMetrics,
    -      partitionNames = null,
    -      dataTypeConverterClz = null,
    -      readSupportClz = classOf[CarbonRowReadSupport])
    -    rdd.collect
    +    scan(getTable(path), projectColumns, filter)
    +  }
    +
    +  def scan(
    +      carbonTable: CarbonTable,
    +      projectColumns: Array[String],
    --- End diff --
    
    Now I removed this function and use `scan` interface only


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181613222
  
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---
    @@ -26,7 +26,6 @@
     import org.apache.carbondata.common.logging.LogService;
     import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.cache.dictionary.Dictionary;
    -import org.apache.carbondata.core.constants.CarbonCommonConstants;
    --- End diff --
    
    Please remove line 38 and 45 unused import


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182719725
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,10 +93,13 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        rows.add(reader.getCurrentValue());
    +        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    +        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
    --- End diff --
    
    This commit will lead to JVM crash in my local machine. After revert, it will running success.


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5028/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5027/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4039/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181587662
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java ---
    @@ -23,51 +23,101 @@
     import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
     import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
     import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.filter.SingleTableProvider;
     import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.DataTypeConverter;
     
     public class QueryModelBuilder {
     
    -  private CarbonTable carbonTable;
    +  private CarbonTable table;
    +  private QueryProjection projection;
    +  private Expression filterExpression;
    +  private DataTypeConverter dataTypeConverter;
    +  private boolean forcedDetailRawQuery;
    +  private boolean readPageByPage;
     
    -  public QueryModelBuilder(CarbonTable carbonTable) {
    -    this.carbonTable = carbonTable;
    +  public QueryModelBuilder(CarbonTable table) {
    +    this.table = table;
       }
     
    -  public QueryModel build(String[] projectionColumnNames, Expression filterExpression) {
    -    QueryModel queryModel = QueryModel.newInstance(carbonTable);
    -    QueryProjection projection = carbonTable.createProjection(projectionColumnNames);
    -    queryModel.setProjection(projection);
    -    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
    -    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
    -    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
    -    queryModel.setIsFilterDimensions(isFilterDimensions);
    -    queryModel.setIsFilterMeasures(isFilterMeasures);
    -    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null);
    -    queryModel.setFilterExpressionResolverTree(filterIntf);
    -    return queryModel;
    +  public QueryModelBuilder projectColumns(String[] projectionColumns) {
    +    String factTableName = table.getTableName();
    +    QueryProjection projection = new QueryProjection();
    +    // fill dimensions
    +    // If columns are null, set all dimensions and measures
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4092/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4085/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632873
  
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---
    @@ -26,7 +26,6 @@
     import org.apache.carbondata.common.logging.LogService;
     import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.cache.dictionary.Dictionary;
    -import org.apache.carbondata.core.constants.CarbonCommonConstants;
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4073/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r183077175
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---
    @@ -110,6 +149,69 @@ class CarbonSession(@transient val sc: SparkContext,
           }
         }
       }
    +
    +  /**
    +   * If the query is a simple query with filter, we will try to use Search Mode,
    +   * otherwise execute in SparkSQL
    +   */
    +  private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = {
    +    val analyzed = qe.analyzed
    +    analyzed match {
    +      case _@Project(columns, _@Filter(expr, s: SubqueryAlias))
    +        if s.child.isInstanceOf[LogicalRelation] &&
    +           s.child.asInstanceOf[LogicalRelation].relation
    +             .isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        runSearch(analyzed, columns, expr, s.child.asInstanceOf[LogicalRelation])
    +      case gl@GlobalLimit(_, ll@LocalLimit(_, p@Project(columns, _@Filter(expr, s: SubqueryAlias))))
    +        if s.child.isInstanceOf[LogicalRelation] &&
    +           s.child.asInstanceOf[LogicalRelation].relation
    +             .isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        val logicalRelation = s.child.asInstanceOf[LogicalRelation]
    +        runSearch(analyzed, columns, expr, logicalRelation, gl.maxRows, ll.maxRows)
    +      case _ =>
    +        new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
    +    }
    +  }
    +
    +  private var carbonStore: SparkCarbonStore = _
    +
    +  def startSearchMode(): Unit = {
    +    CarbonProperties.enableSearchMode(true)
    +    if (carbonStore == null) {
    +      carbonStore = new SparkCarbonStore(this)
    +      carbonStore.startSearchMode()
    +    }
    +  }
    +
    +  def stopSearchMode(): Unit = {
    +    CarbonProperties.enableSearchMode(false)
    +    if (carbonStore != null) {
    +      carbonStore.stopSearchMode()
    +      carbonStore = null
    +    }
    +  }
    +
    +  private def runSearch(
    +      logicalPlan: LogicalPlan,
    +      columns: Seq[NamedExpression],
    +      expr: Expression,
    +      relation: LogicalRelation,
    +      maxRows: Option[Long] = None,
    +      localMaxRows: Option[Long] = None): DataFrame = {
    +    val rows = carbonStore.search(
    +        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable,
    +        columns.map(_.name).toArray,
    +        if (expr != null) CarbonFilters.transformExpression(expr) else null,
    +        maxRows.getOrElse(Long.MaxValue),
    +        localMaxRows.getOrElse(Long.MaxValue))
    --- End diff --
    
    I have added Long.MaxValue here, I think it is better than using -1


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5024/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3857/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    retest this please


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182963146
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---
    @@ -19,6 +19,7 @@ package org.apache.carbondata.examples.util
     
     import java.io.File
     
    +import org.apache.spark.SparkConf
    --- End diff --
    
    not need 


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3808/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4352/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181587317
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java ---
    @@ -91,6 +91,7 @@ public static char convertType(DataType dataType) {
           return STRING_CHAR;
         } else if (dataType == DataTypes.TIMESTAMP) {
           return TIMESTAMP_CHAR;
    +
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using gRPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3882/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5025/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182928852
  
    --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---
    @@ -93,10 +93,13 @@ public ShutdownResponse handleShutdown(ShutdownRequest request) {
         List<CarbonRow> rows = new LinkedList<>();
         try {
           while (reader.nextKeyValue()) {
    -        rows.add(reader.getCurrentValue());
    +        // copy the data as the reader may reuse the same buffer, if unsafe is enabled
    +        rows.add(new CarbonRow(reader.getCurrentValue().getData()));
           }
         } catch (InterruptedException e) {
           throw new IOException(e);
    +    } finally {
    --- End diff --
    
    I test, if add the two line, the JVM will crash. If not, it's ok.
    But I don't know the reason.


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4029/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181585854
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java ---
    @@ -91,6 +91,7 @@ public static char convertType(DataType dataType) {
           return STRING_CHAR;
         } else if (dataType == DataTypes.TIMESTAMP) {
           return TIMESTAMP_CHAR;
    +
    --- End diff --
    
    remove unnecessary line


---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5128/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5143/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4036/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323][WIP] Distributed search mode using...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4993/



---

[GitHub] carbondata issue #2148: [CARBONDATA-2323]Distributed search mode using RPC

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2148
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4102/



---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r182930836
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala ---
    @@ -417,7 +417,7 @@ object ConcurrentQueryBenchmark {
        */
       def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
         // run queries on parquet and carbon
    -    runQueries(spark, table1)
    +    //runQueries(spark, table1)
    --- End diff --
    
    We should keep it.


---

[GitHub] carbondata pull request #2148: [CARBONDATA-2323]Distributed search mode usin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181633683
  
    --- Diff: pom.xml ---
    @@ -121,6 +122,8 @@
         <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
         <script.exetension>.sh</script.exetension>
         <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
    +    <grpc.version>1.10.0</grpc.version>
    +    <netty.version>4.0.43.Final</netty.version>
    --- End diff --
    
    yes. I checked spark2.2, they are using netty 4.0.43. see https://github.com/apache/spark/blob/branch-2.2/pom.xml#L556
    
    because carbon-store-search module does not depend on spark, so I need to explicitly set it here


---