You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/08/06 11:04:51 UTC
[4/5] carbondata git commit: [CARBONDATA-2825][CARBONDATA-2828]
CarbonStore and InternalCarbonStore API This closes #2589
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java
new file mode 100644
index 0000000..fdcffdf
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * An unit for the scanner in Carbon Store
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface ScanUnit extends Serializable, Writable {
+
+ /**
+ * Return the list of preferred location of this ScanUnit.
+ * The default return value is empty string array, which means this ScanUnit
+ * has no location preference.
+ */
+ default String[] preferredLocations() {
+ return new String[0];
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java
new file mode 100644
index 0000000..e56eee2
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * A Scanner is used to scan the table in a distributed compute
+ * engine like Apache Spark
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface Scanner<T> extends DataScanner<T>, Pruner, Serializable {
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java
new file mode 100644
index 0000000..29b3bce
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface TransactionalOperation {
+ /**
+ * commit the transaction when operation succeed
+ */
+ void commit();
+
+ /**
+ * close the transaction when operation failed
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java b/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java
new file mode 100644
index 0000000..d7c2194
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.store.devapi.ScanUnit;
+
+/**
+ * It contains a block to scan, and a destination worker who should scan it
+ */
+@InterfaceAudience.Internal
+public class BlockScanUnit implements ScanUnit {
+
+ // the data block to scan
+ private CarbonInputSplit inputSplit;
+
+ // the worker who should scan this unit
+ private Schedulable schedulable;
+
+ public BlockScanUnit() {
+ }
+
+ public BlockScanUnit(CarbonInputSplit inputSplit, Schedulable schedulable) {
+ this.inputSplit = inputSplit;
+ this.schedulable = schedulable;
+ }
+
+ public CarbonInputSplit getInputSplit() {
+ return inputSplit;
+ }
+
+ public Schedulable getSchedulable() {
+ return schedulable;
+ }
+
+ @Override
+ public String[] preferredLocations() {
+ return inputSplit.preferredLocations();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ inputSplit.write(out);
+ schedulable.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ inputSplit = new CarbonInputSplit();
+ inputSplit.readFields(in);
+ schedulable = new Schedulable();
+ schedulable.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java b/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
deleted file mode 100644
index 7e50102..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.api.CarbonInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Provides base functionality of CarbonStore, it contains basic implementation of metadata
- * management, data pruning and data scan logic.
- */
-@InterfaceAudience.Internal
-public abstract class CarbonStoreBase implements CarbonStore {
-
- private static LogService LOGGER =
- LogServiceFactory.getLogService(CarbonStoreBase.class.getCanonicalName());
-
- MetaProcessor metaProcessor;
- private StoreConf storeConf;
-
- CarbonStoreBase(StoreConf storeConf) {
- this.storeConf = storeConf;
- this.metaProcessor = new MetaProcessor(this);
- }
-
- @Override
- public void createTable(TableDescriptor table) throws IOException, StoreException {
- Objects.requireNonNull(table);
- metaProcessor.createTable(table);
- }
-
- @Override
- public void dropTable(TableIdentifier table) throws IOException {
- Objects.requireNonNull(table);
- metaProcessor.dropTable(table);
- }
-
- @Override
- public CarbonTable getTable(TableIdentifier table) throws IOException {
- Objects.requireNonNull(table);
- return metaProcessor.getTable(table);
- }
-
- public String getTablePath(String tableName, String databaseName) {
- Objects.requireNonNull(tableName);
- Objects.requireNonNull(databaseName);
- return String.format("%s/%s", storeConf.storeLocation(), tableName);
- }
-
- /**
- * Prune data by using CarbonInputFormat.getSplit
- * Return a mapping of host address to list of block.
- * This should be invoked in driver side.
- */
- public static List<Distributable> pruneBlock(CarbonTable table, String[] columns,
- Expression filter) throws IOException {
- Objects.requireNonNull(table);
- Objects.requireNonNull(columns);
- JobConf jobConf = new JobConf(new Configuration());
- Job job = new Job(jobConf);
- CarbonTableInputFormat format;
- try {
- format = CarbonInputFormatUtil.createCarbonTableInputFormat(
- job, table, columns, filter, null, null, true);
- } catch (InvalidConfigurationException e) {
- throw new IOException(e.getMessage());
- }
-
- // We will do FG pruning in reader side, so don't do it here
- CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
- List<InputSplit> splits = format.getSplits(job);
- List<Distributable> blockInfos = new ArrayList<>(splits.size());
- for (InputSplit split : splits) {
- blockInfos.add((Distributable) split);
- }
- return blockInfos;
- }
-
- /**
- * Scan data and return matched rows. This should be invoked in worker side.
- * @param table carbon table
- * @param scan scan parameter
- * @return matched rows
- * @throws IOException if IO error occurs
- */
- public static List<CarbonRow> scan(CarbonTable table, Scan scan) throws IOException {
- CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
- carbonTaskInfo.setTaskId(System.nanoTime());
- ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
-
- CarbonMultiBlockSplit mbSplit = scan.getSplit();
- long limit = scan.getLimit();
- QueryModel queryModel = createQueryModel(table, scan);
-
- LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", scan.getRequestId(),
- queryModel.toString(), mbSplit.getAllSplits().size()));
-
- // read all rows by the reader
- List<CarbonRow> rows = new LinkedList<>();
- try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(scan.getRequestId(),
- table, queryModel)) {
- reader.initialize(mbSplit, null);
-
- // loop to read required number of rows.
- // By default, if user does not specify the limit value, limit is Long.MaxValue
- long rowCount = 0;
- while (reader.nextKeyValue() && rowCount < limit) {
- rows.add(reader.getCurrentValue());
- rowCount++;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
- scan.getRequestId(), rows.size()));
- return rows;
- }
-
- private static QueryModel createQueryModel(CarbonTable table, Scan scan) {
- String[] projectColumns = scan.getProjectColumns();
- Expression filter = null;
- if (scan.getFilterExpression() != null) {
- filter = scan.getFilterExpression();
- }
- return new QueryModelBuilder(table)
- .projectColumns(projectColumns)
- .filterExpression(filter)
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java
new file mode 100644
index 0000000..fc50a62
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.util.CarbonTaskInfo;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+
+public class DataOperation {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataOperation.class.getCanonicalName());
+
+ /**
+ * Scan data and return matched rows. This should be invoked in worker side.
+ * @param tableInfo carbon table
+ * @param scan scan parameter
+ * @return matched rows
+ * @throws IOException if IO error occurs
+ */
+ public static List<CarbonRow> scan(TableInfo tableInfo, ScanRequest scan) throws IOException {
+ CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+ CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
+ carbonTaskInfo.setTaskId(System.nanoTime());
+ ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
+
+ CarbonMultiBlockSplit mbSplit = scan.getSplit();
+ long limit = scan.getLimit();
+ QueryModel queryModel = createQueryModel(table, scan);
+
+ LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", scan.getRequestId(),
+ queryModel.toString(), mbSplit.getAllSplits().size()));
+
+ // read all rows by the reader
+ List<CarbonRow> rows = new LinkedList<>();
+ try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(scan.getRequestId(),
+ table, queryModel)) {
+ reader.initialize(mbSplit, null);
+
+ // loop to read required number of rows.
+ // By default, if user does not specify the limit value, limit is Long.MaxValue
+ long rowCount = 0;
+ while (reader.nextKeyValue() && rowCount < limit) {
+ rows.add(reader.getCurrentValue());
+ rowCount++;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
+ scan.getRequestId(), rows.size()));
+ return rows;
+ }
+
+ private static QueryModel createQueryModel(CarbonTable table, ScanRequest scan) {
+ String[] projectColumns = scan.getProjectColumns();
+ Expression filter = null;
+ if (scan.getFilterExpression() != null) {
+ filter = scan.getFilterExpression();
+ }
+ return new QueryModelBuilder(table)
+ .projectColumns(projectColumns)
+ .filterExpression(filter)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java
new file mode 100644
index 0000000..8d99c3e
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.ServiceFactory;
+
+class DataServicePool {
+ private static final Map<Schedulable, DataService> dataServicePool = new ConcurrentHashMap<>();
+
+ private DataServicePool() {
+ }
+
+ static DataService getOrCreateDataService(Schedulable schedulable) throws IOException {
+ DataService service = dataServicePool.getOrDefault(
+ schedulable,
+ ServiceFactory.createDataService(schedulable.getAddress(), schedulable.getPort()));
+ dataServicePool.putIfAbsent(schedulable, service);
+ return service;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java
new file mode 100644
index 0000000..bd40667
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.devapi.Scanner;
+
+public class DelegatedScanner<T> implements Scanner<T> {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DelegatedScanner.class.getCanonicalName());
+
+ private Pruner pruner;
+ private DataScanner<T> scanner;
+
+ public DelegatedScanner(Pruner pruner, DataScanner<T> scanner) {
+ this.pruner = pruner;
+ this.scanner = scanner;
+ }
+
+ @Override
+ public List<ScanUnit> prune(TableIdentifier table, Expression filterExpression)
+ throws CarbonException {
+ return pruner.prune(table, filterExpression);
+ }
+
+ @Override
+ public Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException {
+ return scanner.scan(input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
deleted file mode 100644
index 3667aea..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
-import org.apache.carbondata.processing.util.CarbonLoaderUtil;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.exception.ExecutionTimeoutException;
-import org.apache.carbondata.store.api.exception.StoreException;
-import org.apache.carbondata.store.impl.master.Schedulable;
-import org.apache.carbondata.store.impl.master.Scheduler;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-
-/**
- * A CarbonStore that leverage multiple servers via RPC calls (Master and Workers)
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-class DistributedCarbonStore extends CarbonStoreBase {
- private static LogService LOGGER =
- LogServiceFactory.getLogService(DistributedCarbonStore.class.getCanonicalName());
- private SegmentTxnManager txnManager;
- private Scheduler scheduler;
- private Random random = new Random();
-
- DistributedCarbonStore(StoreConf storeConf) throws IOException {
- super(storeConf);
- this.scheduler = new Scheduler(storeConf);
- txnManager = SegmentTxnManager.getInstance();
- }
-
- @Override
- public void loadData(LoadDescriptor load) throws IOException, StoreException {
- Objects.requireNonNull(load);
- CarbonTable table = metaProcessor.getTable(load.getTable());
- CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
- builder.setInputPath(load.getInputPath());
- CarbonLoadModel loadModel;
- try {
- loadModel = builder.build(load.getOptions(), System.currentTimeMillis(), "0");
- } catch (InvalidLoadOptionException e) {
- LOGGER.error(e, "Invalid loadDescriptor options");
- throw new StoreException(e);
- } catch (IOException e) {
- LOGGER.error(e, "Failed to loadDescriptor data");
- throw e;
- }
-
- Schedulable worker = scheduler.pickNexWorker();
- try {
- if (loadModel.getFactTimeStamp() == 0) {
- loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime());
- }
- txnManager.openSegment(loadModel, load.isOverwrite());
- LoadDataRequest request = new LoadDataRequest(loadModel);
- BaseResponse response = scheduler.sendRequest(worker, request);
- if (Status.SUCCESS.ordinal() == response.getStatus()) {
- txnManager.commitSegment(loadModel);
- } else {
- txnManager.closeSegment(loadModel);
- throw new StoreException(response.getMessage());
- }
- } finally {
- worker.workload.decrementAndGet();
- }
- }
-
- @Override
- public List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException {
- Objects.requireNonNull(select);
- CarbonTable carbonTable = metaProcessor.getTable(select.getTable());
- return select(
- carbonTable,
- select.getProjection(),
- select.getFilter(),
- select.getLimit(),
- select.getLimit());
- }
-
- /**
- * Execute search by firing RPC call to worker, return the result rows
- *
- * @param table table to search
- * @param columns projection column names
- * @param filter filter expression
- * @param globalLimit max number of rows required in Master
- * @param localLimit max number of rows required in Worker
- * @return CarbonRow
- */
- private List<CarbonRow> select(CarbonTable table, String[] columns, Expression filter,
- long globalLimit, long localLimit) throws IOException {
- Objects.requireNonNull(table);
- Objects.requireNonNull(columns);
- if (globalLimit < 0 || localLimit < 0) {
- throw new IllegalArgumentException("limit should be positive");
- }
-
- int queryId = random.nextInt();
-
- List<CarbonRow> output = new ArrayList<>();
-
- // prune data and get a mapping of worker hostname to list of blocks,
- // then add these blocks to the Scan and fire the RPC call
- List<Distributable> blockInfos = pruneBlock(table, columns, filter);
-
- Map<String, List<Distributable>> nodeBlockMapping =
- CarbonLoaderUtil.nodeBlockMapping(
- blockInfos, -1, scheduler.getAllWorkerAddresses(),
- CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
-
- Set<Map.Entry<String, List<Distributable>>> entries = nodeBlockMapping.entrySet();
- List<Future<QueryResponse>> futures = new ArrayList<>(entries.size());
- List<Schedulable> workers = new ArrayList<>(entries.size());
- for (Map.Entry<String, List<Distributable>> entry : entries) {
- CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(entry.getValue(), entry.getKey());
- Scan scan =
- new Scan(queryId, split, table.getTableInfo(), columns, filter, localLimit);
-
- // Find an Endpoind and send the request to it
- // This RPC is non-blocking so that we do not need to wait before send to next worker
- Schedulable worker = scheduler.pickWorker(entry.getKey());
- workers.add(worker);
- futures.add(scheduler.sendRequestAsync(worker, scan));
- }
-
- int rowCount = 0;
- int length = futures.size();
- for (int i = 0; i < length; i++) {
- Future<QueryResponse> future = futures.get(i);
- Schedulable worker = workers.get(i);
- if (rowCount < globalLimit) {
- // wait for worker
- QueryResponse response = null;
- try {
- response = future
- .get((long) (CarbonProperties.getInstance().getQueryTimeout()), TimeUnit.SECONDS);
- } catch (ExecutionException | InterruptedException e) {
- throw new IOException("exception in worker: " + e.getMessage());
- } catch (TimeoutException t) {
- throw new ExecutionTimeoutException();
- } finally {
- worker.workload.decrementAndGet();
- }
- LOGGER.info("[QueryId: " + queryId + "] receive search response from worker " + worker);
- rowCount += onSuccess(queryId, response, output, globalLimit);
- }
- }
- return output;
- }
-
- private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> output, long globalLimit)
- throws IOException {
- // in case of RPC success, collect all rows in response message
- if (result.getQueryId() != queryId) {
- throw new IOException(
- "queryId in response does not match request: " + result.getQueryId() + " != " + queryId);
- }
- if (result.getStatus() != Status.SUCCESS.ordinal()) {
- throw new IOException("failure in worker: " + result.getMessage());
- }
- int rowCount = 0;
- Object[][] rows = result.getRows();
- for (Object[] row : rows) {
- output.add(new CarbonRow(row));
- rowCount++;
- if (rowCount >= globalLimit) {
- break;
- }
- }
- LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + rowCount);
- return rowCount;
- }
-
- @Override
- public void close() throws IOException {
- LOGGER.info("Shutting down all workers...");
- scheduler.stopAllWorkers();
- LOGGER.info("All workers are shut down");
- try {
- LOGGER.info("Stopping master...");
- scheduler.stopService();
- LOGGER.info("Master stopped");
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java
new file mode 100644
index 0000000..49afb45
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class IndexOperation {
+
+ /**
+ * Prune data by leveraging Carbon's Index
+ */
+ public static List<CarbonInputSplit> pruneBlock(TableInfo tableInfo, Expression filter)
+ throws IOException {
+ Objects.requireNonNull(tableInfo);
+ CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+ JobConf jobConf = new JobConf(new Configuration());
+ Job job = new Job(jobConf);
+ CarbonTableInputFormat format;
+ try {
+ // We just want to do pruning, so passing empty projection columns
+ format = CarbonInputFormatUtil.createCarbonTableInputFormat(
+ job, table, new String[0], filter, null, null, true);
+ } catch (InvalidConfigurationException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ // We will do FG pruning in reader side, so don't do it here
+ CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
+ return format.getSplits(job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
index 64f0742..c856d18 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
@@ -65,7 +65,7 @@ class IndexedRecordReader extends CarbonRecordReader<CarbonRow> {
private int queryId;
private CarbonTable table;
- public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) {
+ IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) {
super(queryModel, new CarbonRowReadSupport());
this.queryId = queryId;
this.table = table;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java
new file mode 100644
index 0000000..4821116
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.sdk.store.DistributedCarbonStore;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataLoader;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.InternalCarbonStore;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ScanOption;
+import org.apache.carbondata.store.devapi.Scanner;
+
+/**
+ * This store does prune and scan either remotely by sending RPC to Master/Worker
+ * or in local JVM, depends on parameter passed.
+ */
+@InterfaceAudience.Internal
+public class InternalCarbonStoreImpl extends DistributedCarbonStore implements InternalCarbonStore {
+
+ private Map<TableIdentifier, CarbonTable> tableCache = new ConcurrentHashMap<>();
+ private StoreConf storeConf;
+
+ public InternalCarbonStoreImpl(StoreConf storeConf) throws IOException {
+ super(storeConf);
+ this.storeConf = storeConf;
+ }
+
+ @Override
+ public CarbonTable getCarbonTable(TableIdentifier tableIdentifier)
+ throws CarbonException {
+ Objects.requireNonNull(tableIdentifier);
+ CarbonTable carbonTable = tableCache.getOrDefault(
+ tableIdentifier,
+ CarbonTable.buildFromTableInfo(storeService.getTable(tableIdentifier)));
+ tableCache.putIfAbsent(tableIdentifier, carbonTable);
+ return carbonTable;
+ }
+
+ @Override
+ public DataLoader newLoader(LoadDescriptor load) throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * By default, it returns a Scanner that does prune and scan remotely
+ */
+ @Override
+ public <T> Scanner<T> newScanner(TableIdentifier identifier, ScanDescriptor scanDescriptor,
+ Map<String, String> scanOption, CarbonReadSupport<T> readSupport)
+ throws CarbonException {
+ Objects.requireNonNull(identifier);
+ Objects.requireNonNull(scanDescriptor);
+ boolean isRemotePrune;
+ boolean isOpPushdown;
+ if (scanOption == null) {
+ isRemotePrune = true;
+ isOpPushdown = true;
+ } else {
+ isRemotePrune = ScanOption.isRemotePrune(scanOption);
+ isOpPushdown = ScanOption.isOperatorPushdown(scanOption);
+ }
+
+ TableInfo tableInfo = MetaOperation.getTable(identifier, storeConf);
+ Pruner pruner;
+ DataScanner<T> scanner;
+
+ if (isRemotePrune) {
+ pruner = new RemotePruner(storeConf.masterHost(), storeConf.pruneServicePort());
+ } else {
+ pruner = new LocalPruner(storeConf);
+ }
+ if (isOpPushdown) {
+ scanner = new RemoteDataScanner<>(tableInfo, scanDescriptor, scanOption, readSupport);
+ } else {
+ scanner = new LocalDataScanner<>(storeConf, scanDescriptor, scanOption);
+ }
+
+ return new DelegatedScanner<>(pruner, scanner);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
index 40b6bdf..bd8fafe 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.store.impl;
import java.io.IOException;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -28,11 +29,13 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.StructType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
@@ -40,12 +43,17 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.exception.StoreException;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.KeyedRow;
+import org.apache.carbondata.sdk.store.PrimaryKey;
+import org.apache.carbondata.sdk.store.Row;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -63,7 +71,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
*/
@InterfaceAudience.User
@InterfaceStability.Unstable
-class LocalCarbonStore extends CarbonStoreBase {
+public class LocalCarbonStore extends MetaOperation implements CarbonStore {
private static final LogService LOGGER =
LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
@@ -72,11 +80,11 @@ class LocalCarbonStore extends CarbonStoreBase {
private Configuration configuration;
private SegmentTxnManager txnManager;
- LocalCarbonStore(StoreConf storeConf) {
+ public LocalCarbonStore(StoreConf storeConf) {
this(storeConf, new Configuration());
}
- LocalCarbonStore(StoreConf storeConf, Configuration hadoopConf) {
+ private LocalCarbonStore(StoreConf storeConf, Configuration hadoopConf) {
super(storeConf);
this.storeConf = storeConf;
this.txnManager = SegmentTxnManager.getInstance();
@@ -84,17 +92,20 @@ class LocalCarbonStore extends CarbonStoreBase {
}
@Override
- public void loadData(LoadDescriptor load) throws IOException, StoreException {
+ public void loadData(LoadDescriptor load) throws CarbonException {
Objects.requireNonNull(load);
- CarbonTable table = metaProcessor.getTable(load.getTable());
- CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table);
- modelBuilder.setInputPath(load.getInputPath());
CarbonLoadModel loadModel;
try {
+ TableInfo tableInfo = getTable(load.getTable(), storeConf);
+ CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+ CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table);
+ modelBuilder.setInputPath(load.getInputPath());
loadModel = modelBuilder.build(load.getOptions(), System.currentTimeMillis(), "0");
} catch (InvalidLoadOptionException e) {
LOGGER.error(e, "Invalid loadDescriptor options");
- throw new StoreException(e.getMessage());
+ throw new CarbonException(e);
+ } catch (IOException e) {
+ throw new CarbonException(e);
}
if (loadModel.getFactTimeStamp() == 0) {
@@ -106,12 +117,27 @@ class LocalCarbonStore extends CarbonStoreBase {
loadData(loadModel);
txnManager.commitSegment(loadModel);
} catch (Exception e) {
- txnManager.closeSegment(loadModel);
LOGGER.error(e, "Failed to load data");
- throw new StoreException(e);
+ try {
+ txnManager.closeSegment(loadModel);
+ } catch (IOException ex) {
+ LOGGER.error(ex, "Failed to close segment");
+ // Ignoring the exception
+ }
+ throw new CarbonException(e);
}
}
+ @Override
+ public void upsert(Iterator<KeyedRow> row, StructType schema) throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void delete(Iterator<PrimaryKey> keys) throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+
private void loadData(CarbonLoadModel model) throws Exception {
DataLoadExecutor executor = null;
try {
@@ -146,19 +172,34 @@ class LocalCarbonStore extends CarbonStoreBase {
}
@Override
- public List<CarbonRow> select(SelectDescriptor select) throws IOException {
- Objects.requireNonNull(select);
- CarbonTable table = metaProcessor.getTable(select.getTable());
- List<Distributable> blocks = pruneBlock(table, select.getProjection(), select.getFilter());
- CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, "");
- Scan scan = new Scan(
- 0, split, table.getTableInfo(), select.getProjection(), select.getFilter(),
- select.getLimit());
- return scan(table, scan);
+ public List<CarbonRow> scan(ScanDescriptor scanDescriptor) throws CarbonException {
+ Objects.requireNonNull(scanDescriptor);
+ try {
+ TableInfo tableInfo = getTable(scanDescriptor.getTableIdentifier(), storeConf);
+ List<CarbonInputSplit> blocks =
+ IndexOperation.pruneBlock(tableInfo, scanDescriptor.getFilter());
+ CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, new String[0]);
+ ScanRequest scan =
+ new ScanRequest(0, split, tableInfo, scanDescriptor.getProjection(),
+ scanDescriptor.getFilter(), scanDescriptor.getLimit());
+ return DataOperation.scan(tableInfo, scan);
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ }
+
+ @Override
+ public Row lookup(PrimaryKey key) throws CarbonException {
+ throw new UnsupportedOperationException();
}
@Override
- public void close() throws IOException {
+ public List<Row> lookup(TableIdentifier tableIdentifier, String filterExpression)
+ throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void close() throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java
new file mode 100644
index 0000000..c3429a4
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+
+/**
+ * This scanner scans in local JVM
+ * @param <T> scan output
+ */
+public class LocalDataScanner<T> implements DataScanner<T> {
+
+ private StoreConf storeConf;
+ private ScanDescriptor scanDescriptor;
+ private Map<String, String> scanOption;
+
+ LocalDataScanner(StoreConf storeConf, ScanDescriptor scanDescriptor,
+ Map<String, String> scanOption) {
+ this.storeConf = storeConf;
+ this.scanDescriptor = scanDescriptor;
+ this.scanOption = scanOption;
+ }
+
+ @Override
+ public Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException {
+ Objects.requireNonNull(scanDescriptor);
+ try {
+ TableInfo tableInfo = MetaOperation.getTable(scanDescriptor.getTableIdentifier(), storeConf);
+ List<CarbonInputSplit> blocks =
+ IndexOperation.pruneBlock(tableInfo, scanDescriptor.getFilter());
+ CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, new String[0]);
+ ScanRequest scan =
+ new ScanRequest(0, split, tableInfo, scanDescriptor.getProjection(),
+ scanDescriptor.getFilter(), scanDescriptor.getLimit());
+ List<T> rows = (List<T>) DataOperation.scan(tableInfo, scan);
+ RowMajorResultBatch<T> resultBatch = new RowMajorResultBatch<>(rows);
+ return Collections.singletonList(resultBatch).iterator();
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java
new file mode 100644
index 0000000..734072c
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ScanUnit;
+
+public class LocalPruner implements Pruner {
+
+ private StoreConf storeConf;
+
+ LocalPruner(StoreConf storeConf) {
+ this.storeConf = storeConf;
+ }
+
+ @Override
+ public List<ScanUnit> prune(TableIdentifier identifier, Expression filterExpression)
+ throws CarbonException {
+ try {
+ TableInfo table = MetaOperation.getTable(identifier, storeConf);
+ List<CarbonInputSplit> splits = IndexOperation.pruneBlock(table, filterExpression);
+ return splits.stream().map(
+ (Function<CarbonInputSplit, ScanUnit>) inputSplit ->
+ // LocalScanner will scan in local JVM, it is not sending RPC to
+ // schedulable (Worker), so it can be null
+ new BlockScanUnit(inputSplit, null)
+ ).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
new file mode 100644
index 0000000..3ca1ffe
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * Provides table management.
+ */
+@InterfaceAudience.Internal
+public class MetaOperation {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(MetaOperation.class.getCanonicalName());
+
+ private StoreConf storeConf;
+
+ // mapping of table path to CarbonTable object
+ private static final Map<String, TableInfo> cache = new HashMap<>();
+
+ public MetaOperation(StoreConf storeConf) {
+ this.storeConf = storeConf;
+ }
+
+ public void createTable(TableDescriptor descriptor) throws CarbonException {
+ TableIdentifier table = descriptor.getTable();
+ Field[] fields = descriptor.getSchema().getFields();
+ // sort_columns
+ List<String> sortColumnsList = null;
+ try {
+ sortColumnsList = descriptor.getSchema().prepareSortColumns(descriptor.getProperties());
+ } catch (MalformedCarbonCommandException e) {
+ throw new CarbonException(e.getMessage());
+ }
+ ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
+
+ TableSchemaBuilder builder = TableSchema.builder();
+ CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList);
+
+ TableSchema schema = builder.tableName(table.getTableName())
+ .properties(descriptor.getProperties())
+ .setSortColumns(Arrays.asList(sortColumnsSchemaList))
+ .build();
+
+ SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+ schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+ schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
+ schema.setTableName(table.getTableName());
+
+ String tablePath = descriptor.getTablePath();
+ if (tablePath == null) {
+ tablePath = getTablePath(table.getTableName(), table.getDatabaseName());
+ }
+
+ TableInfo tableInfo = CarbonTable.builder()
+ .databaseName(table.getDatabaseName())
+ .tableName(table.getTableName())
+ .tablePath(tablePath)
+ .tableSchema(schema)
+ .isTransactionalTable(true)
+ .buildTableInfo();
+
+ try {
+ createTable(tableInfo, descriptor.isIfNotExists());
+ } catch (IOException e) {
+ LOGGER.error(e, "create tableDescriptor failed");
+ throw new CarbonException(e.getMessage());
+ }
+ }
+
+ private void createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException {
+ AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
+ boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
+ if (tableExists) {
+ if (ifNotExists) {
+ return;
+ } else {
+ throw new IOException(
+ "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable()
+ .getTableName() + ", because it already exists");
+ }
+ }
+
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ String databaseName = tableInfo.getDatabaseName();
+ String tableName = tableInfo.getFactTable().getTableName();
+ org.apache.carbondata.format.TableInfo thriftTableInfo =
+ schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName);
+
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+ String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
+ FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
+ try {
+ if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+ boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType);
+ if (!isDirCreated) {
+ throw new IOException("Failed to create the metadata directory " + schemaMetadataPath);
+ }
+ }
+ ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+ thriftWriter.open(FileWriteOperation.OVERWRITE);
+ thriftWriter.write(thriftTableInfo);
+ thriftWriter.close();
+ } catch (IOException e) {
+ LOGGER.error(e, "Failed to handle create table");
+ throw e;
+ }
+ }
+
+ public void dropTable(TableIdentifier table) throws CarbonException {
+ String tablePath = getTablePath(table.getTableName(), table.getDatabaseName());
+ cache.remove(tablePath);
+ try {
+ FileFactory.deleteFile(tablePath);
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ }
+
+ public TableInfo getTable(TableIdentifier table) throws CarbonException {
+ return getTable(table, storeConf);
+ }
+
+ public static TableInfo getTable(TableIdentifier table, StoreConf storeConf)
+ throws CarbonException {
+ String tablePath = getTablePath(table.getTableName(), table.getDatabaseName(), storeConf);
+ if (cache.containsKey(tablePath)) {
+ return cache.get(tablePath);
+ } else {
+ org.apache.carbondata.format.TableInfo formatTableInfo = null;
+ try {
+ formatTableInfo = CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ formatTableInfo, table.getDatabaseName(), table.getTableName(), tablePath);
+ tableInfo.setTablePath(tablePath);
+ cache.put(tablePath, tableInfo);
+ return tableInfo;
+ }
+ }
+
+ public List<TableDescriptor> listTable() throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+
+ public TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getTablePath(String tableName, String databaseName) {
+ Objects.requireNonNull(tableName);
+ Objects.requireNonNull(databaseName);
+ return String.format("%s/%s", storeConf.storeLocation(), tableName);
+ }
+ public static String getTablePath(String tableName, String databaseName, StoreConf storeConf) {
+ Objects.requireNonNull(tableName);
+ Objects.requireNonNull(databaseName);
+ return String.format("%s/%s", storeConf.storeLocation(), tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
deleted file mode 100644
index 6d03711..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.TableSchema;
-import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.ThriftWriter;
-import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-class MetaProcessor {
-
- private static LogService LOGGER =
- LogServiceFactory.getLogService(MetaProcessor.class.getCanonicalName());
-
- private CarbonStoreBase store;
-
- MetaProcessor(CarbonStoreBase store) {
- this.store = store;
- }
-
- // mapping of table path to CarbonTable object
- private Map<String, CarbonTable> cache = new HashMap<>();
-
- public void createTable(TableDescriptor descriptor) throws StoreException {
- Field[] fields = descriptor.getSchema().getFields();
- // sort_columns
- List<String> sortColumnsList = null;
- try {
- sortColumnsList = descriptor.getSchema().prepareSortColumns(descriptor.getProperties());
- } catch (MalformedCarbonCommandException e) {
- throw new StoreException(e.getMessage());
- }
- ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
-
- TableSchemaBuilder builder = TableSchema.builder();
- CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList);
-
- TableSchema schema = builder.tableName(descriptor.getTable().getTableName())
- .properties(descriptor.getProperties())
- .setSortColumns(Arrays.asList(sortColumnsSchemaList))
- .build();
-
- SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
- schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
- schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
- schema.setTableName(descriptor.getTable().getTableName());
-
- String tablePath = descriptor.getTablePath();
- if (tablePath == null) {
- tablePath = store.getTablePath(
- descriptor.getTable().getTableName(), descriptor.getTable().getDatabaseName());
- }
-
- TableInfo tableInfo = CarbonTable.builder()
- .databaseName(descriptor.getTable().getDatabaseName())
- .tableName(descriptor.getTable().getTableName())
- .tablePath(tablePath)
- .tableSchema(schema)
- .isTransactionalTable(true)
- .buildTableInfo();
-
- try {
- createTable(tableInfo, descriptor.isIfNotExists());
- } catch (IOException e) {
- LOGGER.error(e, "create tableDescriptor failed");
- throw new StoreException(e.getMessage());
- }
- }
-
- private void createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException {
- AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
- boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
- if (tableExists) {
- if (ifNotExists) {
- return;
- } else {
- throw new IOException(
- "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable()
- .getTableName() + ", because it already exists");
- }
- }
-
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- String databaseName = tableInfo.getDatabaseName();
- String tableName = tableInfo.getFactTable().getTableName();
- org.apache.carbondata.format.TableInfo thriftTableInfo =
- schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName);
-
- String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
- String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
- FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
- try {
- if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
- boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType);
- if (!isDirCreated) {
- throw new IOException("Failed to create the metadata directory " + schemaMetadataPath);
- }
- }
- ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
- thriftWriter.open(FileWriteOperation.OVERWRITE);
- thriftWriter.write(thriftTableInfo);
- thriftWriter.close();
- } catch (IOException e) {
- LOGGER.error(e, "Failed to handle create table");
- throw e;
- }
- }
-
- public void dropTable(TableIdentifier table) throws IOException {
- String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName());
- cache.remove(tablePath);
- FileFactory.deleteFile(tablePath);
- }
-
- public CarbonTable getTable(TableIdentifier table) throws IOException {
- String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName());
- if (cache.containsKey(tablePath)) {
- return cache.get(tablePath);
- } else {
- org.apache.carbondata.format.TableInfo formatTableInfo =
- CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo(
- formatTableInfo, table.getDatabaseName(), table.getTableName(), tablePath);
- tableInfo.setTablePath(tablePath);
- CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
- cache.put(tablePath, carbonTable);
- return carbonTable;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java
new file mode 100644
index 0000000..8125426
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+import org.apache.carbondata.store.impl.service.model.ScanResponse;
+
+/**
+ * This scanner scans in local JVM
+ * @param <T> scan output
+ */
+public class RemoteDataScanner<T> implements DataScanner<T> {
+
+ private TableInfo tableInfo;
+ private ScanDescriptor scanDescriptor;
+ private Map<String, String> scanOption;
+ private CarbonReadSupport<T> readSupport;
+
+ RemoteDataScanner(TableInfo tableInfo, ScanDescriptor scanDescriptor,
+ Map<String, String> scanOption, CarbonReadSupport<T> readSupport) {
+ this.tableInfo = tableInfo;
+ this.scanDescriptor = scanDescriptor;
+ this.scanOption = scanOption;
+ this.readSupport = readSupport;
+ }
+
+ @Override
+ public Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException {
+ List<CarbonInputSplit> toBeScan = new ArrayList<>();
+ if (input instanceof BlockScanUnit) {
+ toBeScan.add(((BlockScanUnit) input).getInputSplit());
+ } else {
+ throw new CarbonException(input.getClass().getName() + " is not supported");
+ }
+ int queryId = new Random().nextInt();
+ CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(toBeScan, input.preferredLocations());
+ try {
+ ScanRequest request = new ScanRequest(queryId, split, tableInfo,
+ scanDescriptor.getProjection(), scanDescriptor.getFilter(), scanDescriptor.getLimit());
+ DataService dataService =
+ DataServicePool.getOrCreateDataService(((BlockScanUnit) input).getSchedulable());
+ ScanResponse response = dataService.scan(request);
+ List<T> rows = Arrays.stream(response.getRows())
+ .map(row -> readSupport.readRow(row))
+ .collect(Collectors.toList());
+
+ return Collections.singletonList(new RowMajorResultBatch<>(rows)).iterator();
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java
new file mode 100644
index 0000000..5fc485e
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.impl.service.PruneService;
+import org.apache.carbondata.store.impl.service.ServiceFactory;
+import org.apache.carbondata.store.impl.service.model.PruneRequest;
+import org.apache.carbondata.store.impl.service.model.PruneResponse;
+
+public class RemotePruner implements Pruner {
+
+ private String pruneServiceHost;
+ private int pruneServiePort;
+
+ RemotePruner(String pruneServiceHost, int pruneServiePort) {
+ this.pruneServiceHost = pruneServiceHost;
+ this.pruneServiePort = pruneServiePort;
+ }
+
+ @Override
+ public List<ScanUnit> prune(TableIdentifier table, Expression filterExpression)
+ throws CarbonException {
+ try {
+ PruneRequest request = new PruneRequest(table, filterExpression);
+ PruneService pruneService = ServiceFactory.createPruneService(
+ pruneServiceHost, pruneServiePort);
+ PruneResponse response = pruneService.prune(request);
+ return response.getScanUnits();
+ } catch (IOException e) {
+ throw new CarbonException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java b/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java
new file mode 100644
index 0000000..1999a63
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.store.devapi.ResultBatch;
+
+public class RowMajorResultBatch<T> implements ResultBatch<T> {
+
+ private Iterator<T> iterator;
+
+ RowMajorResultBatch(List<T> rows) {
+ Objects.requireNonNull(rows);
+ this.iterator = rows.iterator();
+ }
+
+ @Override
+ public boolean isColumnar() {
+ return false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java b/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
new file mode 100644
index 0000000..6e29b48
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class Schedulable implements Writable {
+
+ private String id;
+ private String address;
+ private int port;
+ private int cores;
+ public AtomicInteger workload;
+
+ public Schedulable() {
+ }
+
+ public Schedulable(String id, String address, int port, int cores) {
+ this.id = id;
+ this.address = address;
+ this.port = port;
+ this.cores = cores;
+ this.workload = new AtomicInteger();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+ @Override public String toString() {
+ return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + '\'' + ", port=" + port
+ + '}';
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(address);
+ out.writeInt(port);
+ out.writeInt(cores);
+ // We are not writing workload since it is only useful for
+ // Scheduler inside the Master. Client of the Master does
+ // not need it
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readUTF();
+ address = in.readUTF();
+ port = in.readInt();
+ cores = in.readInt();
+ }
+}