You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/04 00:25:20 UTC

[05/10] incubator-kylin git commit: KYLIN-780 Upgrade query module, all query test pass

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 7f707c7..485ed3d 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -22,7 +22,7 @@
 	<artifactId>kylin-jdbc</artifactId>
 	<packaging>jar</packaging>
 	<name>Kylin:JDBC</name>
-	<description>Kylin JDBC Driver on optiq avatica</description>
+	<description>Kylin JDBC Driver on Calcite Avatica</description>
 
 	<parent>
 		<groupId>org.apache.kylin</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
index 5198fd7..4cc0445 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
@@ -1,147 +1,140 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.jdbc;
-
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-import net.hydromatic.avatica.AvaticaConnection;
-import net.hydromatic.avatica.AvaticaStatement;
-import net.hydromatic.avatica.DriverVersion;
-import net.hydromatic.avatica.Handler;
-import net.hydromatic.avatica.HandlerImpl;
-import net.hydromatic.avatica.UnregisteredDriver;
-
-import org.apache.kylin.jdbc.stub.RemoteClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.jdbc.stub.ConnectionException;
-
-/**
- * <p>
- * Kylin JDBC Driver based on optiq avatica and kylin restful api.<br>
- * Supported versions:
- * </p>
- * <ul>
- * <li>jdbc 4.0</li>
- * <li>jdbc 4.1</li>
- * </ul>
- * 
- * <p>
- * Supported Statements:
- * </p>
- * <ul>
- * <li>{@link KylinStatementImpl}</li>
- * <li>{@link KylinPrepareStatementImpl}</li>
- * </ul>
- * 
- * <p>
- * Supported properties:
- * <ul>
- * <li>user: username</li>
- * <li>password: password</li>
- * <li>ssl: true/false</li>
- * </ul>
- * </p>
- * 
- * <p>
- * Driver init code sample:<br>
- * 
- * <pre>
- * Driver driver = (Driver) Class.forName(&quot;org.apache.kylin.kylin.jdbc.Driver&quot;).newInstance();
- * Properties info = new Properties();
- * info.put(&quot;user&quot;, &quot;user&quot;);
- * info.put(&quot;password&quot;, &quot;password&quot;);
- * info.put(&quot;ssl&quot;, true);
- * Connection conn = driver.connect(&quot;jdbc:kylin://{domain}/{project}&quot;, info);
- * </pre>
- * 
- * </p>
- * 
- * @author xduo
- * 
- */
-public class Driver extends UnregisteredDriver {
-    private static final Logger logger = LoggerFactory.getLogger(Driver.class);
-
-    public static final String CONNECT_STRING_PREFIX = "jdbc:kylin:";
-    static {
-        try {
-            DriverManager.registerDriver(new Driver());
-        } catch (SQLException e) {
-            throw new RuntimeException("Error occurred while registering JDBC driver " + Driver.class.getName() + ": " + e.toString());
-        }
-    }
-    
-    @Override
-    protected DriverVersion createDriverVersion() {
-        return DriverVersion.load(Driver.class, "org-apache-kylin-jdbc.properties", "Kylin JDBC Driver", "unknown version", "Kylin", "unknown version");
-    }
-
-    @Override
-    protected String getFactoryClassName(JdbcVersion jdbcVersion) {
-        switch (jdbcVersion) {
-        case JDBC_30:
-            throw new UnsupportedOperationException();
-        case JDBC_40:
-            return "org.apache.kylin.jdbc.KylinJdbc40Factory";
-        case JDBC_41:
-        default:
-            return "org.apache.kylin.jdbc.KylinJdbc41Factory";
-        }
-    }
-
-    @Override
-    protected Handler createHandler() {
-        return new HandlerImpl() {
-            @Override
-            public void onConnectionInit(AvaticaConnection connection_) throws SQLException {
-                KylinConnectionImpl kylinConn = (KylinConnectionImpl) connection_;
-                RemoteClient runner = ((KylinJdbc41Factory) factory).newRemoteClient(kylinConn);
-                try {
-                    runner.connect();
-                    kylinConn.setMetaProject(runner.getMetadata(kylinConn.getProject()));
-                    logger.debug("Connection inited.");
-                } catch (ConnectionException e) {
-                    logger.error(e.getLocalizedMessage(), e);
-                    throw new SQLException(e.getLocalizedMessage());
-                }
-            }
-
-            public void onConnectionClose(AvaticaConnection connection) {
-                logger.debug("Connection closed.");
-            }
-
-            public void onStatementExecute(AvaticaStatement statement, ResultSink resultSink) {
-                logger.debug("statement executed.");
-            }
-
-            public void onStatementClose(AvaticaStatement statement) {
-                logger.debug("statement closed.");
-            }
-        };
-    }
-
-    @Override
-    protected String getConnectStringPrefix() {
-        return CONNECT_STRING_PREFIX;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.jdbc;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.DriverVersion;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+/**
+ * <p>
+ * Kylin JDBC Driver based on Calcite Avatica and Kylin restful API.<br>
+ * Supported versions:
+ * </p>
+ * <ul>
+ * <li>jdbc 4.0</li>
+ * <li>jdbc 4.1</li>
+ * </ul>
+ * 
+ * <p>
+ * Supported Statements:
+ * </p>
+ * <ul>
+ * <li>{@link KylinStatementImpl}</li>
+ * <li>{@link KylinPrepareStatementImpl}</li>
+ * </ul>
+ * 
+ * <p>
+ * Supported properties:
+ * <ul>
+ * <li>user: username</li>
+ * <li>password: password</li>
+ * <li>ssl: true/false</li>
+ * </ul>
+ * </p>
+ * 
+ * <p>
+ * Driver init code sample:<br>
+ * 
+ * <pre>
+ * Driver driver = (Driver) Class.forName(&quot;org.apache.kylin.kylin.jdbc.Driver&quot;).newInstance();
+ * Properties info = new Properties();
+ * info.put(&quot;user&quot;, &quot;user&quot;);
+ * info.put(&quot;password&quot;, &quot;password&quot;);
+ * info.put(&quot;ssl&quot;, true);
+ * Connection conn = driver.connect(&quot;jdbc:kylin://{domain}/{project}&quot;, info);
+ * </pre>
+ * 
+ * </p>
+ */
+public class Driver extends UnregisteredDriver {
+
+    public static final String CONNECT_STRING_PREFIX = "jdbc:kylin:";
+    static {
+        try {
+            DriverManager.registerDriver(new Driver());
+        } catch (SQLException e) {
+            throw new RuntimeException("Error occurred while registering JDBC driver " + Driver.class.getName() + ": " + e.toString());
+        }
+    }
+    
+    @Override
+    protected String getConnectStringPrefix() {
+        return CONNECT_STRING_PREFIX;
+    }
+
+    @Override
+    protected DriverVersion createDriverVersion() {
+        return DriverVersion.load(Driver.class, "kylin-jdbc.properties", "Kylin JDBC Driver", "unknown version", "Kylin", "unknown version");
+    }
+
+    @Override
+    protected String getFactoryClassName(JdbcVersion jdbcVersion) {
+        switch (jdbcVersion) {
+        case JDBC_30:
+            throw new UnsupportedOperationException();
+        case JDBC_40:
+            return KylinJdbcFactory.Version40.class.getName();
+        case JDBC_41:
+        default:
+            return KylinJdbcFactory.Version41.class.getName();
+        }
+    }
+
+    @Override
+    public Meta createMeta(AvaticaConnection connection) {
+        return new KylinMeta((KylinConnection) connection);
+    }
+
+//    @Override
+//    protected Handler createHandler() {
+//        return new HandlerImpl() {
+//            @Override
+//            public void onConnectionInit(AvaticaConnection connection) throws SQLException {
+//                KylinConnection conn = (KylinConnection) connection;
+//                RemoteClient runner = ((KylinJdbcFactory) factory).newRemoteClient(conn);
+//                try {
+//                    runner.connect();
+//                    conn.setMetaProject(runner.getMetadata(conn.getProject()));
+//                    logger.debug("Connection inited.");
+//                } catch (ConnectionException e) {
+//                    logger.error(e.getLocalizedMessage(), e);
+//                    throw new SQLException(e.getLocalizedMessage());
+//                }
+//            }
+//
+//            public void onConnectionClose(AvaticaConnection connection) {
+//                logger.debug("Connection closed.");
+//            }
+//
+//            public void onStatementExecute(AvaticaStatement statement, ResultSink resultSink) {
+//                logger.debug("statement executed.");
+//            }
+//
+//            public void onStatementClose(AvaticaStatement statement) {
+//                logger.debug("statement closed.");
+//            }
+//        };
+//    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
new file mode 100644
index 0000000..b6a13e5
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.jdbc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.kylin.jdbc.KylinMeta.KMetaProject;
+
+public interface IRemoteClient extends Closeable {
+
+    public static class QueryResult {
+        public final List<ColumnMetaData> columnMeta;
+        public final Iterable<Object> iterable;
+
+        public QueryResult(List<ColumnMetaData> columnMeta, Iterable<Object> iterable) {
+            this.columnMeta = columnMeta;
+            this.iterable = iterable;
+        }
+    }
+
+    /**
+     * Connect to Kylin restful service. IOException will be thrown if authentication failed.
+     */
+    public void connect() throws IOException;
+
+    /**
+     * Retrieve meta data of given project.
+     */
+    public KMetaProject retrieveMetaData(String project) throws IOException;
+
+    /**
+     * Execute query remotely and get back result.
+     */
+    public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
new file mode 100644
index 0000000..4b73ad6
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
@@ -0,0 +1,373 @@
+package org.apache.kylin.jdbc;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.ColumnMetaData.Rep;
+import org.apache.calcite.avatica.ColumnMetaData.ScalarType;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodBase;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.httpclient.protocol.Protocol;
+import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
+import org.apache.kylin.jdbc.KylinMeta.KMetaCatalog;
+import org.apache.kylin.jdbc.KylinMeta.KMetaColumn;
+import org.apache.kylin.jdbc.KylinMeta.KMetaProject;
+import org.apache.kylin.jdbc.KylinMeta.KMetaSchema;
+import org.apache.kylin.jdbc.KylinMeta.KMetaTable;
+import org.apache.kylin.jdbc.json.PreparedQueryRequest;
+import org.apache.kylin.jdbc.json.QueryRequest;
+import org.apache.kylin.jdbc.json.SQLResponseStub;
+import org.apache.kylin.jdbc.json.StatementParameter;
+import org.apache.kylin.jdbc.json.TableMetaStub;
+import org.apache.kylin.jdbc.json.TableMetaStub.ColumnMetaStub;
+import org.apache.kylin.jdbc.util.DefaultSslProtocolSocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class KylinClient implements IRemoteClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(KylinClient.class);
+
+    private final KylinConnection conn;
+    private final Properties connProps;
+    private final HttpClient httpClient;
+    private final ObjectMapper jsonMapper;
+
+    public KylinClient(KylinConnection conn) {
+        this.conn = conn;
+        this.connProps = conn.getConnectionProperties();
+        this.httpClient = new HttpClient();
+        this.jsonMapper = new ObjectMapper();
+
+        // trust all certificates
+        if (isSSL()) {
+            Protocol.registerProtocol("https", new Protocol("https", (ProtocolSocketFactory) new DefaultSslProtocolSocketFactory(), 443));
+        }
+    }
+
+    private boolean isSSL() {
+        return Boolean.parseBoolean(connProps.getProperty("ssl", "false"));
+    }
+
+    private String baseUrl() {
+        return (isSSL() ? "https://" : "http://") + conn.getBaseUrl();
+    }
+
+    private void addHttpHeaders(HttpMethodBase method) {
+        method.addRequestHeader("Accept", "application/json, text/plain, */*");
+        method.addRequestHeader("Content-Type", "application/json");
+
+        String username = connProps.getProperty("user");
+        String password = connProps.getProperty("password");
+        String basicAuth = DatatypeConverter.printBase64Binary((username + ":" + password).getBytes());
+        method.addRequestHeader("Authorization", "Basic " + basicAuth);
+    }
+
+    @Override
+    public void connect() throws IOException {
+        PostMethod post = new PostMethod(baseUrl() + "/kylin/api/user/authentication");
+        addHttpHeaders(post);
+        StringRequestEntity requestEntity = new StringRequestEntity("{}", "application/json", "UTF-8");
+        post.setRequestEntity(requestEntity);
+
+        httpClient.executeMethod(post);
+
+        if (post.getStatusCode() != 200 && post.getStatusCode() != 201) {
+            throw asIOException(post);
+        }
+    }
+
+    @Override
+    public KMetaProject retrieveMetaData(String project) throws IOException {
+        assert conn.getProject().equals(project);
+
+        String url = baseUrl() + "/kylin/api/tables_and_columns?project=" + project;
+        GetMethod get = new GetMethod(url);
+        addHttpHeaders(get);
+
+        httpClient.executeMethod(get);
+
+        if (get.getStatusCode() != 200 && get.getStatusCode() != 201) {
+            throw asIOException(get);
+        }
+
+        List<TableMetaStub> tableMetaStubs = jsonMapper.readValue(get.getResponseBodyAsStream(), new TypeReference<List<TableMetaStub>>() {
+        });
+
+        List<KMetaTable> tables = convertMetaTables(tableMetaStubs);
+        List<KMetaSchema> schemas = convertMetaSchemas(tables);
+        List<KMetaCatalog> catalogs = convertMetaCatalogs(schemas);
+        return new KMetaProject(project, catalogs);
+    }
+
+    private List<KMetaCatalog> convertMetaCatalogs(List<KMetaSchema> schemas) {
+        Map<String, List<KMetaSchema>> catalogMap = new LinkedHashMap<String, List<KMetaSchema>>();
+        for (KMetaSchema schema : schemas) {
+            List<KMetaSchema> list = catalogMap.get(schema.tableCatalog);
+            if (list == null) {
+                list = new ArrayList<KMetaSchema>();
+                catalogMap.put(schema.tableCatalog, list);
+            }
+            list.add(schema);
+        }
+        
+        List<KMetaCatalog> result = new ArrayList<KMetaCatalog>();
+        for (List<KMetaSchema> catSchemas : catalogMap.values()) {
+            String catalog = catSchemas.get(0).tableCatalog;
+            result.add(new KMetaCatalog(catalog, catSchemas));
+        }
+        return result;
+    }
+
+    private List<KMetaSchema> convertMetaSchemas(List<KMetaTable> tables) {
+        Map<String, List<KMetaTable>> schemaMap = new LinkedHashMap<String, List<KMetaTable>>();
+        for (KMetaTable table : tables) {
+            String key = table.tableCat + "!!" + table.tableSchem;
+            List<KMetaTable> list = schemaMap.get(key);
+            if (list == null) {
+                list = new ArrayList<KMetaTable>();
+                schemaMap.put(key, list);
+            }
+            list.add(table);
+        }
+        
+        List<KMetaSchema> result = new ArrayList<KMetaSchema>();
+        for (List<KMetaTable> schemaTables : schemaMap.values()) {
+            String catalog = schemaTables.get(0).tableCat;
+            String schema = schemaTables.get(0).tableSchem;
+            result.add(new KMetaSchema(catalog, schema, schemaTables));
+        }
+        return result;
+    }
+
+    private List<KMetaTable> convertMetaTables(List<TableMetaStub> tableMetaStubs) {
+        List<KMetaTable> result = new ArrayList<KMetaTable>(tableMetaStubs.size());
+        for (TableMetaStub tableStub : tableMetaStubs) {
+            result.add(convertMetaTable(tableStub));
+        }
+        return result;
+    }
+
+    private KMetaTable convertMetaTable(TableMetaStub tableStub) {
+        List<KMetaColumn> columns = new ArrayList<KMetaColumn>(tableStub.getColumns().size());
+        for (ColumnMetaStub columnStub : tableStub.getColumns()) {
+            columns.add(convertMetaColumn(columnStub));
+        }
+        return new KMetaTable(tableStub.getTABLE_CAT(), tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(), tableStub.getTABLE_TYPE(), columns);
+    }
+
+    private KMetaColumn convertMetaColumn(ColumnMetaStub columnStub) {
+        return new KMetaColumn(columnStub.getTABLE_CAT(), columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(), columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), columnStub.getTYPE_NAME(), columnStub.getCOLUMN_SIZE(), columnStub.getDECIMAL_DIGITS(), columnStub.getNUM_PREC_RADIX(), columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), columnStub.getORDINAL_POSITION(), columnStub.getIS_NULLABLE());
+    }
+
+    @Override
+    public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues) throws IOException {
+
+        SQLResponseStub queryResp = executeKylinQuery(sql, convertParameters(params, paramValues));
+        if (queryResp.getIsException())
+            throw new IOException(queryResp.getExceptionMessage());
+
+        List<ColumnMetaData> metas = convertColumnMeta(queryResp);
+        List<Object> data = convertResultData(queryResp, metas);
+
+        return new QueryResult(metas, data);
+    }
+
+    private List<StatementParameter> convertParameters(List<AvaticaParameter> params, List<Object> paramValues) {
+        if (params == null || params.isEmpty())
+            return null;
+
+        assert params.size() == paramValues.size();
+
+        List<StatementParameter> result = new ArrayList<StatementParameter>();
+        for (Object v : paramValues) {
+            result.add(new StatementParameter(v.getClass().getCanonicalName(), String.valueOf(v)));
+        }
+        return result;
+    }
+
+    private SQLResponseStub executeKylinQuery(String sql, List<StatementParameter> params) throws IOException {
+        String url = baseUrl() + "/kylin/api/query";
+        String project = conn.getProject();
+
+        QueryRequest request = null;
+        if (null != params) {
+            request = new PreparedQueryRequest();
+            ((PreparedQueryRequest) request).setParams(params);
+            url += "/prestate"; // means prepared statement..
+        } else {
+            request = new QueryRequest();
+        }
+        request.setSql(sql);
+        request.setProject(project);
+
+        PostMethod post = new PostMethod(url);
+        addHttpHeaders(post);
+
+        String postBody = jsonMapper.writeValueAsString(request);
+        logger.debug("Post body:\n " + postBody);
+        StringRequestEntity requestEntity = new StringRequestEntity(postBody, "application/json", "UTF-8");
+        post.setRequestEntity(requestEntity);
+
+        httpClient.executeMethod(post);
+
+        if (post.getStatusCode() != 200 && post.getStatusCode() != 201) {
+            throw asIOException(post);
+        }
+
+        return jsonMapper.readValue(post.getResponseBodyAsStream(), SQLResponseStub.class);
+    }
+
+    private List<ColumnMetaData> convertColumnMeta(SQLResponseStub queryResp) {
+        List<ColumnMetaData> metas = new ArrayList<ColumnMetaData>();
+        for (int i = 0; i < queryResp.getColumnMetas().size(); i++) {
+            SQLResponseStub.ColumnMetaStub scm = queryResp.getColumnMetas().get(i);
+            ScalarType type = ColumnMetaData.scalar(scm.getColumnType(), scm.getColumnTypeName(), Rep.of(convertType(scm.getColumnType())));
+
+            ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), scm.isCaseSensitive(), scm.isSearchable(), scm.isCurrency(), scm.getIsNullable(), scm.isSigned(), scm.getDisplaySize(), scm.getLabel(), scm.getName(), scm.getSchemaName(), scm.getPrecision(), scm.getScale(), scm.getTableName(), scm.getSchemaName(), type, scm.isReadOnly(), scm.isWritable(), scm.isWritable(), null);
+
+            metas.add(meta);
+        }
+
+        return metas;
+    }
+
+    private List<Object> convertResultData(SQLResponseStub queryResp, List<ColumnMetaData> metas) {
+        List<String[]> stringResults = queryResp.getResults();
+        List<Object> data = new ArrayList<Object>(stringResults.size());
+        for (String[] result : stringResults) {
+            Object[] row = new Object[result.length];
+
+            for (int i = 0; i < result.length; i++) {
+                ColumnMetaData meta = metas.get(i);
+                row[i] = wrapObject(result[i], meta.type.type);
+            }
+
+            data.add(row);
+        }
+        return (List<Object>) data;
+    }
+
+    private IOException asIOException(HttpMethodBase method) throws IOException {
+        return new IOException(method + " failed, error code " + method.getStatusCode() + " and response: " + method.getResponseBodyAsString());
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static Class convertType(int sqlType) {
+        Class result = Object.class;
+
+        switch (sqlType) {
+        case Types.CHAR:
+        case Types.VARCHAR:
+        case Types.LONGVARCHAR:
+            result = String.class;
+            break;
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+            result = BigDecimal.class;
+            break;
+        case Types.BIT:
+            result = Boolean.class;
+            break;
+        case Types.TINYINT:
+            result = Byte.class;
+            break;
+        case Types.SMALLINT:
+            result = Short.class;
+            break;
+        case Types.INTEGER:
+            result = Integer.class;
+            break;
+        case Types.BIGINT:
+            result = Long.class;
+            break;
+        case Types.REAL:
+        case Types.FLOAT:
+        case Types.DOUBLE:
+            result = Double.class;
+            break;
+        case Types.BINARY:
+        case Types.VARBINARY:
+        case Types.LONGVARBINARY:
+            result = Byte[].class;
+            break;
+        case Types.DATE:
+            result = Date.class;
+            break;
+        case Types.TIME:
+            result = Time.class;
+            break;
+        case Types.TIMESTAMP:
+            result = Timestamp.class;
+            break;
+        }
+
+        return result;
+    }
+
+    public static Object wrapObject(String value, int sqlType) {
+        if (null == value) {
+            return null;
+        }
+
+        switch (sqlType) {
+        case Types.CHAR:
+        case Types.VARCHAR:
+        case Types.LONGVARCHAR:
+            return value;
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+            return new BigDecimal(value);
+        case Types.BIT:
+            return Boolean.parseBoolean(value);
+        case Types.TINYINT:
+            return Byte.valueOf(value);
+        case Types.SMALLINT:
+            return Short.valueOf(value);
+        case Types.INTEGER:
+            return Integer.parseInt(value);
+        case Types.BIGINT:
+            return Long.parseLong(value);
+        case Types.FLOAT:
+            return Float.parseFloat(value);
+        case Types.REAL:
+        case Types.DOUBLE:
+            return Double.parseDouble(value);
+        case Types.BINARY:
+        case Types.VARBINARY:
+        case Types.LONGVARBINARY:
+            return value.getBytes();
+        case Types.DATE:
+            return Date.valueOf(value);
+        case Types.TIME:
+            return Time.valueOf(value);
+        case Types.TIMESTAMP:
+            return Timestamp.valueOf(value);
+        }
+
+        return value;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
new file mode 100644
index 0000000..86b2ff8
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.Meta.CursorFactory;
+import org.apache.calcite.avatica.Meta.Signature;
+import org.apache.calcite.avatica.UnregisteredDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KylinConnection extends AvaticaConnection {
+
+    private static final Logger logger = LoggerFactory.getLogger(KylinConnection.class);
+
+    private final String baseUrl;
+    private final String project;
+    private final IRemoteClient remoteClient;
+
+    protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory factory, String url, Properties info) throws SQLException {
+        super(driver, factory, url, info);
+        
+        String odbcUrl = url;
+        odbcUrl = odbcUrl.replace(Driver.CONNECT_STRING_PREFIX + "//", "");
+        String[] temps = odbcUrl.split("/");
+
+        assert temps.length == 2;
+
+        this.baseUrl = temps[0];
+        this.project = temps[1];
+
+        logger.debug("Kylin base url " + this.baseUrl + ", project name " + this.project);
+        
+        this.remoteClient = factory.newRemoteClient(this);
+
+        try {
+            this.remoteClient.connect();
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+    
+    String getBaseUrl() {
+        return baseUrl;
+    }
+    
+    String getProject() {
+        return project;
+    }
+    
+    Properties getConnectionProperties() {
+        return info;
+    }
+    
+    @Override
+    public AvaticaStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        return super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        Meta.Signature sig = mockPreparedSignature(sql);
+        return factory().newPreparedStatement(this, null, sig, resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+    
+    // TODO add restful API to prepare SQL, get back expected ResultSetMetaData
+    Signature mockPreparedSignature(String sql) {
+        List<AvaticaParameter> params = new ArrayList<AvaticaParameter>();
+        int startIndex = 0;
+        while (sql.indexOf("?", startIndex) >= 0) {
+            AvaticaParameter param = new AvaticaParameter(false, 0, 0, 0, null, null, null);
+            params.add(param);
+            startIndex = sql.indexOf("?", startIndex) + 1;
+        }
+
+        ArrayList<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();
+        Map<String, Object> internalParams = Collections.<String, Object>emptyMap();
+        
+        return new Meta.Signature(columns, sql, params, internalParams, CursorFactory.ARRAY);
+    }
+
+    private KylinJdbcFactory factory() {
+        return (KylinJdbcFactory) factory;
+    }
+
+    public IRemoteClient getRemoteClient() {
+        return remoteClient;
+    }
+
+    @Override
+    public void close() throws SQLException {
+        super.close();
+        try {
+            remoteClient.close();
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnectionImpl.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnectionImpl.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnectionImpl.java
deleted file mode 100644
index 887bd66..0000000
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnectionImpl.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.jdbc;
-
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import javax.xml.bind.DatatypeConverter;
-
-import net.hydromatic.avatica.AvaticaConnection;
-import net.hydromatic.avatica.AvaticaFactory;
-import net.hydromatic.avatica.AvaticaPreparedStatement;
-import net.hydromatic.avatica.AvaticaStatement;
-import net.hydromatic.avatica.Meta;
-import net.hydromatic.avatica.UnregisteredDriver;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.jdbc.KylinPrepare.PrepareResult;
-
-/**
- * Kylin connection implementation
- * 
- * @author xduo
- * 
- */
-public abstract class KylinConnectionImpl extends AvaticaConnection {
-    private static final Logger logger = LoggerFactory.getLogger(KylinConnectionImpl.class);
-
-    private final String baseUrl;
-    private final String project;
-    private KylinMetaImpl.MetaProject metaProject;
-    public final List<AvaticaStatement> statements;
-    static final Trojan TROJAN = createTrojan();
-
-    protected KylinConnectionImpl(UnregisteredDriver driver, AvaticaFactory factory, String url, Properties info) {
-        super(driver, factory, url, info);
-
-        String odbcUrl = url;
-        odbcUrl = odbcUrl.replace(Driver.CONNECT_STRING_PREFIX + "//", "");
-        String[] temps = odbcUrl.split("/");
-
-        assert temps.length == 2;
-
-        this.baseUrl = temps[0];
-        this.project = temps[1];
-
-        logger.debug("Kylin base url " + this.baseUrl + ", project name " + this.project);
-
-        statements = new ArrayList<AvaticaStatement>();
-    }
-
-    @Override
-    protected Meta createMeta() {
-        return new KylinMetaImpl(this, (KylinJdbc41Factory) factory);
-    }
-
-    @Override
-    public AvaticaStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-        AvaticaStatement statement = super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
-        statements.add(statement);
-
-        return statement;
-    }
-
-    @Override
-    public PreparedStatement prepareStatement(String sql) throws SQLException {
-        PrepareResult pr = new KylinPrepareImpl().prepare(sql);
-        AvaticaPreparedStatement statement = ((KylinJdbc41Factory) factory).newPreparedStatement(this, pr, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, this.getHoldability());
-        statements.add(statement);
-
-        return statement;
-    }
-
-    // ~ kylin specified implements
-
-    public String getBasicAuthHeader() {
-        String username = this.info.getProperty("user");
-        String password = this.info.getProperty("password");
-
-        return DatatypeConverter.printBase64Binary((username + ":" + password).getBytes());
-    }
-
-    public String getConnectUrl() {
-        boolean isSsl = Boolean.parseBoolean((this.info.getProperty("ssl", "false")));
-        return (isSsl ? "https://" : "http://") + this.baseUrl + "/kylin/api/user/authentication";
-    }
-
-    public String getMetaProjectUrl(String project) {
-        assert project != null;
-        boolean isSsl = Boolean.parseBoolean((this.info.getProperty("ssl", "false")));
-        return (isSsl ? "https://" : "http://") + this.baseUrl + "/kylin/api/tables_and_columns?project=" + project;
-    }
-
-    public String getQueryUrl() {
-        boolean isSsl = Boolean.parseBoolean((this.info.getProperty("ssl", "false")));
-        return (isSsl ? "https://" : "http://") + this.baseUrl + "/kylin/api/query";
-    }
-
-    public String getProject() {
-        return this.project;
-    }
-
-    public Meta getMeta() {
-        return this.meta;
-    }
-
-    public AvaticaFactory getFactory() {
-        return this.factory;
-    }
-
-    public UnregisteredDriver getDriver() {
-        return this.driver;
-    }
-
-    public KylinMetaImpl.MetaProject getMetaProject() {
-        return metaProject;
-    }
-
-    public void setMetaProject(KylinMetaImpl.MetaProject metaProject) {
-        this.metaProject = metaProject;
-    }
-
-    @Override
-    public void close() throws SQLException {
-        super.close();
-
-        this.metaProject = null;
-        this.statements.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "KylinConnectionImpl [baseUrl=" + baseUrl + ", project=" + project + ", metaProject=" + metaProject + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinEnumerator.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinEnumerator.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinEnumerator.java
deleted file mode 100644
index 04d9231..0000000
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinEnumerator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.jdbc;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import net.hydromatic.linq4j.Enumerator;
-
-/**
- * Query results enumerator
- * 
- * @author xduo
- * 
- */
-public class KylinEnumerator<E> implements Enumerator<E> {
-
-    /**
-     * current row
-     */
-    private E current;
-
-    /**
-     * data collection
-     */
-    private Collection<E> dataCollection;
-
-    /**
-     * result iterator
-     */
-    private Iterator<E> cursor;
-
-    public KylinEnumerator(Collection<E> dataCollection) {
-        this.dataCollection = dataCollection;
-        this.cursor = this.dataCollection.iterator();
-
-        if (null == this.cursor) {
-            throw new RuntimeException("Cursor can't be null");
-        }
-    }
-
-    @Override
-    public E current() {
-        return current;
-    }
-
-    @Override
-    public boolean moveNext() {
-        if (!cursor.hasNext()) {
-            this.reset();
-
-            return false;
-        }
-
-        current = cursor.next();
-
-        return true;
-    }
-
-    @Override
-    public void reset() {
-        this.cursor = this.dataCollection.iterator();
-    }
-
-    @Override
-    public void close() {
-        this.cursor = null;
-        this.dataCollection = null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc40Factory.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc40Factory.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc40Factory.java
deleted file mode 100644
index 235348b..0000000
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc40Factory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.jdbc;
-
-/**
- * @author xduo
- * 
- */
-public class KylinJdbc40Factory extends KylinJdbc41Factory {
-
-    public KylinJdbc40Factory() {
-        super(4, 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc41Factory.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc41Factory.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc41Factory.java
deleted file mode 100644
index 90ff2e2..0000000
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbc41Factory.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.jdbc;
-
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-import java.util.TimeZone;
-
-import net.hydromatic.avatica.AvaticaConnection;
-import net.hydromatic.avatica.AvaticaDatabaseMetaData;
-import net.hydromatic.avatica.AvaticaFactory;
-import net.hydromatic.avatica.AvaticaPrepareResult;
-import net.hydromatic.avatica.AvaticaPreparedStatement;
-import net.hydromatic.avatica.AvaticaResultSet;
-import net.hydromatic.avatica.AvaticaResultSetMetaData;
-import net.hydromatic.avatica.AvaticaStatement;
-import net.hydromatic.avatica.ColumnMetaData;
-import net.hydromatic.avatica.UnregisteredDriver;
-
-import org.apache.kylin.jdbc.stub.KylinClient;
-import org.apache.kylin.jdbc.stub.RemoteClient;
-
-/**
- * Kylin JDBC factory.
- * 
- * @author xduo
- * 
- */
-public class KylinJdbc41Factory implements AvaticaFactory {
-    private final int major;
-    private final int minor;
-
-    /** Creates a JDBC factory. */
-    public KylinJdbc41Factory() {
-        this(4, 1);
-    }
-
-    /** Creates a JDBC factory with given major/minor version number. */
-    protected KylinJdbc41Factory(int major, int minor) {
-        this.major = major;
-        this.minor = minor;
-    }
-
-    public int getJdbcMajorVersion() {
-        return major;
-    }
-
-    public int getJdbcMinorVersion() {
-        return minor;
-    }
-
-    public AvaticaConnection newConnection(UnregisteredDriver driver, AvaticaFactory factory, String url, Properties info) {
-        return new KylinJdbc41Connection(driver, factory, url, info);
-    }
-
-    public AvaticaDatabaseMetaData newDatabaseMetaData(AvaticaConnection connection) {
-        return new AvaticaJdbc41DatabaseMetaData(connection);
-    }
-
-    public AvaticaStatement newStatement(AvaticaConnection connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
-        return new KylinJdbc41Statement(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
-    }
-
-    public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connection, AvaticaPrepareResult prepareResult, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-        return new KylinJdbc41PreparedStatement(connection, prepareResult, resultSetType, resultSetConcurrency, resultSetHoldability);
-    }
-
-    public AvaticaResultSet newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) {
-        final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList());
-        return new KylinResultSet(statement, prepareResult, metaData, timeZone);
-    }
-
-    public AvaticaResultSetMetaData newResultSetMetaData(AvaticaStatement statement, List<ColumnMetaData> columnMetaDataList) {
-        return new AvaticaResultSetMetaData(statement, null, columnMetaDataList);
-    }
-
-    // ~ kylin sepcified
-    public RemoteClient newRemoteClient(KylinConnectionImpl connection) {
-        return new KylinClient(connection);
-    }
-
-    /** Implementation of Connection for JDBC 4.1. */
-    private static class KylinJdbc41Connection extends KylinConnectionImpl {
-        KylinJdbc41Connection(UnregisteredDriver driver, AvaticaFactory factory, String url, Properties info) {
-            super(driver, (KylinJdbc41Factory) factory, url, info);
-        }
-    }
-
-    /** Implementation of Statement for JDBC 4.1. */
-    public static class KylinJdbc41Statement extends KylinStatementImpl {
-        public KylinJdbc41Statement(AvaticaConnection connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
-            super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
-        }
-    }
-
-    /** Implementation of PreparedStatement for JDBC 4.1. */
-    public static class KylinJdbc41PreparedStatement extends KylinPrepareStatementImpl {
-        KylinJdbc41PreparedStatement(AvaticaConnection connection, AvaticaPrepareResult sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-            super(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
-        }
-    }
-
-    /** Implementation of DatabaseMetaData for JDBC 4.1. */
-    private static class AvaticaJdbc41DatabaseMetaData extends AvaticaDatabaseMetaData {
-        AvaticaJdbc41DatabaseMetaData(AvaticaConnection connection) {
-            super(connection);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
new file mode 100644
index 0000000..030dbbb
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.jdbc;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaDatabaseMetaData;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
+import org.apache.calcite.avatica.AvaticaResultSet;
+import org.apache.calcite.avatica.AvaticaResultSetMetaData;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta.Signature;
+import org.apache.calcite.avatica.Meta.StatementHandle;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+/**
+ * Kylin JDBC factory.
+ */
+public class KylinJdbcFactory implements AvaticaFactory {
+
+    public static class Version40 extends KylinJdbcFactory {
+        public Version40() {
+            super(4, 0);
+        }
+    }
+
+    public static class Version41 extends KylinJdbcFactory {
+        public Version41() {
+            super(4, 1);
+        }
+    }
+
+    final int major;
+    final int minor;
+
+    /** Creates a JDBC factory with given major/minor version number. */
+    protected KylinJdbcFactory(int major, int minor) {
+        this.major = major;
+        this.minor = minor;
+    }
+
+    @Override
+    public int getJdbcMajorVersion() {
+        return major;
+    }
+
+    @Override
+    public int getJdbcMinorVersion() {
+        return minor;
+    }
+
+    @Override
+    public AvaticaConnection newConnection(UnregisteredDriver driver, AvaticaFactory factory, String url, Properties info) throws SQLException {
+        return new KylinConnection(driver, (KylinJdbcFactory) factory, url, info);
+    }
+
+    @Override
+    public AvaticaDatabaseMetaData newDatabaseMetaData(AvaticaConnection connection) {
+        return new AvaticaDatabaseMetaData(connection) {
+        };
+    }
+
+    @Override
+    public AvaticaStatement newStatement(AvaticaConnection connection, StatementHandle h, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        return new KylinStatement((KylinConnection) connection, h, resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connection, StatementHandle h, Signature signature, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        return new KylinPreparedStatement((KylinConnection) connection, h, signature, resultSetType, resultSetConcurrency, resultSetHoldability);
+    }
+
+    @Override
+    public AvaticaResultSet newResultSet(AvaticaStatement statement, Signature signature, TimeZone timeZone, Iterable<Object> iterable) throws SQLException {
+        AvaticaResultSetMetaData resultSetMetaData = new AvaticaResultSetMetaData(statement, null, signature);
+        return new KylinResultSet(statement, signature, resultSetMetaData, timeZone, iterable);
+    }
+
+    @Override
+    public ResultSetMetaData newResultSetMetaData(AvaticaStatement statement, Signature signature) throws SQLException {
+        return new AvaticaResultSetMetaData(statement, null, signature);
+    }
+
+    public IRemoteClient newRemoteClient(KylinConnection conn) {
+        return new KylinClient(conn);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22a0bc33/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
new file mode 100644
index 0000000..72d7fb1
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.jdbc;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.MetaImpl;
+
+/**
+ * Implementation of Avatica interface
+ */
+public class KylinMeta extends MetaImpl {
+
+    private KMetaProject metaProject;
+
+    public KylinMeta(KylinConnection connection) {
+        super(connection);
+    }
+
+    private KylinConnection connection() {
+        return (KylinConnection) connection;
+    }
+
+    // insert/update/delete go this path, ignorable for Kylin
+    @Override
+    public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
+        return connection().mockPreparedSignature(sql);
+    }
+
+    // mimic from CalciteMetaImpl, real execution happens via callback in KylinResultSet.execute()
+    @Override
+    public MetaResultSet prepareAndExecute(StatementHandle h, String sql, int maxRowCount, PrepareCallback callback) {
+        final Signature signature;
+        try {
+            synchronized (callback.getMonitor()) {
+                callback.clear();
+                signature = prepare(h, sql, maxRowCount);
+                callback.assign(signature, null);
+            }
+            callback.execute();
+            return new MetaResultSet(h.id, false, signature, null);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private KMetaProject getMetaProject() {
+        try {
+            if (metaProject == null) {
+                KylinConnection conn = connection();
+                metaProject = conn.getRemoteClient().retrieveMetaData(conn.getProject());
+            }
+            return metaProject;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public MetaResultSet getTableTypes() {
+        return createResultSet(metaTableTypes, MetaTableType.class, "TABLE_TYPE");
+    }
+
+    @Override
+    public MetaResultSet getCatalogs() {
+        List<KMetaCatalog> catalogs = getMetaProject().catalogs;
+        return createResultSet(catalogs, MetaCatalog.class, "TABLE_CATALOG");
+    }
+
+    @Override
+    public MetaResultSet getSchemas(String catalog, Pat schemaPattern) {
+        List<KMetaSchema> schemas = getMetaProject().getSchemas(catalog, schemaPattern);
+        return createResultSet(schemas, MetaSchema.class, "TABLE_SCHEM", "TABLE_CATALOG");
+    }
+
+    @Override
+    public MetaResultSet getTables(String catalog, Pat schemaPattern, Pat tableNamePattern, List<String> typeList) {
+        List<KMetaTable> tables = getMetaProject().getTables(catalog, schemaPattern, tableNamePattern, typeList);
+        return createResultSet(tables, MetaTable.class, //
+                "TABLE_CAT", //
+                "TABLE_SCHEM", //
+                "TABLE_NAME", //
+                "TABLE_TYPE", //
+                "REMARKS", //
+                "TYPE_CAT", //
+                "TYPE_SCHEM", //
+                "TYPE_NAME", //
+                "SELF_REFERENCING_COL_NAME", //
+                "REF_GENERATION");
+    }
+
+    @Override
+    public MetaResultSet getColumns(String catalog, Pat schemaPattern, Pat tableNamePattern, Pat columnNamePattern) {
+        List<KMetaColumn> columns = getMetaProject().getColumns(catalog, schemaPattern, tableNamePattern, columnNamePattern);
+        return createResultSet(columns, MetaColumn.class, //
+                "TABLE_CAT", //
+                "TABLE_SCHEM", //
+                "TABLE_NAME", //
+                "COLUMN_NAME", //
+                "DATA_TYPE", //
+                "TYPE_NAME", //
+                "COLUMN_SIZE", //
+                "BUFFER_LENGTH", //
+                "DECIMAL_DIGITS", //
+                "NUM_PREC_RADIX", //
+                "NULLABLE", //
+                "REMARKS", //
+                "COLUMN_DEF", //
+                "SQL_DATA_TYPE", //
+                "SQL_DATETIME_SUB", //
+                "CHAR_OCTET_LENGTH", //
+                "ORDINAL_POSITION", //
+                "IS_NULLABLE", //
+                "SCOPE_CATALOG", //
+                "SCOPE_TABLE", //
+                "SOURCE_DATA_TYPE", //
+                "IS_AUTOINCREMENT", //
+                "IS_GENERATEDCOLUMN");
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private MetaResultSet createResultSet(List iterable, Class clazz, String... names) {
+        final List<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();
+        final List<Field> fields = new ArrayList<Field>();
+        final List<String> fieldNames = new ArrayList<String>();
+        for (String name : names) {
+            final int index = fields.size();
+            final String fieldName = AvaticaUtils.toCamelCase(name);
+            final Field field;
+            try {
+                field = clazz.getField(fieldName);
+            } catch (NoSuchFieldException e) {
+                throw new RuntimeException(e);
+            }
+            columns.add(columnMetaData(name, index, field.getType()));
+            fields.add(field);
+            fieldNames.add(fieldName);
+        }
+
+        CursorFactory cursorFactory = CursorFactory.record(clazz, fields, fieldNames);
+        Signature signature = new Signature(columns, "", null, Collections.<String, Object> emptyMap(), cursorFactory);
+        StatementHandle statementHandle = this.createStatement(null);
+
+        return new MetaResultSet(statementHandle.id, true, signature, iterable);
+    }
+
+    // ============================================================================
+
+    public static interface NamedWithChildren extends Named {
+        List<? extends NamedWithChildren> getChildren();
+    }
+
+    public static List<? extends NamedWithChildren> searchByPatterns(NamedWithChildren parent, Pat... patterns) {
+        assert patterns != null && patterns.length > 0;
+        
+        List<? extends NamedWithChildren> children = findChildren(parent, patterns[0]);
+        if (patterns.length == 1) {
+            return children;
+        } else {
+            List<NamedWithChildren> result = new ArrayList<NamedWithChildren>();
+            Pat[] subPatterns = Arrays.copyOfRange(patterns, 1, patterns.length);
+            for (NamedWithChildren c : children) {
+                result.addAll(searchByPatterns(c, subPatterns));
+            }
+            return result;
+        }
+    }
+
+    private static List<? extends NamedWithChildren> findChildren(NamedWithChildren parent, Pat pattern) {
+        if (null == pattern.s || pattern.s.equals("%")) {
+            return parent.getChildren();
+        }
+
+        List<NamedWithChildren> result = new ArrayList<NamedWithChildren>();
+        Pattern regex = likeToRegex(pattern);
+
+        for (NamedWithChildren c : parent.getChildren()) {
+            if (regex.matcher(c.getName()).matches()) {
+                result.add(c);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Converts a LIKE-style pattern (where '%' represents a wild-card,
+     * escaped using '\') to a Java regex.
+     */
+    private static Pattern likeToRegex(Pat pattern) {
+        StringBuilder buf = new StringBuilder("^");
+        char[] charArray = pattern.s.toCharArray();
+        int slash = -2;
+        for (int i = 0; i < charArray.length; i++) {
+            char c = charArray[i];
+            if (slash == i - 1) {
+                buf.append('[').append(c).append(']');
+            } else {
+                switch (c) {
+                case '\\':
+                    slash = i;
+                    break;
+                case '%':
+                    buf.append(".*");
+                    break;
+                case '[':
+                    buf.append("\\[");
+                    break;
+                case ']':
+                    buf.append("\\]");
+                    break;
+                default:
+                    buf.append('[').append(c).append(']');
+                }
+            }
+        }
+        buf.append("$");
+
+        return Pattern.compile(buf.toString());
+    }
+
+    // ============================================================================
+
+    private static final List<MetaTableType> metaTableTypes = new ArrayList<MetaTableType>();
+    static {
+        metaTableTypes.add(new MetaTableType("TABLE"));
+    }
+
+    public static class KMetaProject implements NamedWithChildren {
+        public final String projectName;
+        public final List<KMetaCatalog> catalogs;
+
+        public KMetaProject(String projectName, List<KMetaCatalog> catalogs) {
+            this.projectName = projectName;
+            this.catalogs = catalogs;
+        }
+
+        @SuppressWarnings("unchecked")
+        public List<KMetaSchema> getSchemas(String catalog, Pat schemaPattern) {
+            return (List<KMetaSchema>) searchByPatterns(this, Pat.of(catalog), schemaPattern);
+        }
+
+        @SuppressWarnings("unchecked")
+        public List<KMetaTable> getTables(String catalog, Pat schemaPattern, Pat tableNamePattern, List<String> typeList) {
+            return (List<KMetaTable>) searchByPatterns(this, Pat.of(catalog), schemaPattern, tableNamePattern);
+        }
+        
+        @SuppressWarnings("unchecked")
+        public List<KMetaColumn> getColumns(String catalog, Pat schemaPattern, Pat tableNamePattern, Pat columnNamePattern) {
+            return (List<KMetaColumn>) searchByPatterns(this, Pat.of(catalog), schemaPattern, tableNamePattern, columnNamePattern);
+        }
+        
+        @Override
+        public String getName() {
+            return projectName;
+        }
+
+        @Override
+        public List<? extends NamedWithChildren> getChildren() {
+            return catalogs;
+        }
+    }
+
+    public static class KMetaCatalog extends MetaCatalog implements NamedWithChildren {
+        public final List<KMetaSchema> schemas;
+
+        public KMetaCatalog(String tableCatalog, List<KMetaSchema> schemas) {
+            super(tableCatalog);
+            this.schemas = schemas;
+        }
+
+        @Override
+        public List<? extends NamedWithChildren> getChildren() {
+            return schemas;
+        }
+    }
+
+    public static class KMetaSchema extends MetaSchema implements NamedWithChildren {
+        public final List<KMetaTable> tables;
+
+        public KMetaSchema(String tableCatalog, String tableSchem, List<KMetaTable> tables) {
+            super(tableCatalog, tableSchem);
+            this.tables = tables;
+        }
+
+        @Override
+        public List<? extends NamedWithChildren> getChildren() {
+            return tables;
+        }
+    }
+
+    public static class KMetaTable extends MetaTable implements NamedWithChildren {
+        public final List<KMetaColumn> columns;
+
+        public KMetaTable(String tableCat, String tableSchem, String tableName, String tableType, List<KMetaColumn> columns) {
+            super(tableCat, tableSchem, tableName, tableType);
+            this.columns = columns;
+        }
+
+        @Override
+        public List<? extends NamedWithChildren> getChildren() {
+            return columns;
+        }
+    }
+
+    public static class KMetaColumn extends MetaColumn implements NamedWithChildren {
+
+        public KMetaColumn(String tableCat, String tableSchem, String tableName, String columnName, int dataType, String typeName, int columnSize, Integer decimalDigits, int numPrecRadix, int nullable, int charOctetLength, int ordinalPosition, String isNullable) {
+            super(tableCat, tableSchem, tableName, columnName, dataType, typeName, columnSize, decimalDigits, numPrecRadix, nullable, charOctetLength, ordinalPosition, isNullable);
+        }
+
+        @Override
+        public List<NamedWithChildren> getChildren() {
+            return Collections.<NamedWithChildren> emptyList();
+        }
+    }
+}