You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/18 09:56:47 UTC

[2/3] TAJO-176: Implement Tajo JDBC Driver. (Keuntae Park via jihoon)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
new file mode 100644
index 0000000..6c64ac2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
@@ -0,0 +1,1196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
+
+import java.sql.*;
+import java.util.*;
+
+/**
+ * TajoDatabaseMetaData.
+ */
+public class TajoDatabaseMetaData implements DatabaseMetaData {
+  private static final char SEARCH_STRING_ESCAPE = '\\';
+
+  private final TajoConnection conn;
+
+  public TajoDatabaseMetaData(TajoConnection conn) {
+    this.conn = conn;
+  }
+
+  @Override
+  public boolean allProceduresAreCallable()
+      throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean allTablesAreSelectable()
+      throws SQLException {
+    return true;
+  }
+
+  @Override
+  public String getURL()
+      throws SQLException {
+    return conn.getUri();
+  }
+
+  @Override
+  public String getUserName()
+      throws SQLException {
+    return "tajo";
+  }
+
+  @Override
+  public boolean isReadOnly()
+      throws SQLException {
+    return true;
+  }
+
+  @Override
+  public String getDatabaseProductName()
+      throws SQLException {
+    return "Tajo";
+  }
+
+  @Override
+  public String getDatabaseProductVersion()
+      throws SQLException {
+    //TODO get from tajo master
+    return TajoConstants.TAJO_VERSION;
+  }
+
+  @Override
+  public String getDriverName()
+      throws SQLException {
+    return "tajo";
+  }
+
+  @Override
+  public String getDriverVersion()
+      throws SQLException {
+    return TajoDriver.MAJOR_VERSION + "." + TajoDriver.MINOR_VERSION;
+  }
+
+  @Override
+  public int getDriverMajorVersion() {
+    return TajoDriver.MAJOR_VERSION;
+  }
+
+  @Override
+  public int getDriverMinorVersion() {
+    return TajoDriver.MINOR_VERSION;
+  }
+
+  @Override
+  public String getIdentifierQuoteString()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getIdentifierQuoteString not supported");
+  }
+
+  @Override
+  public String getSQLKeywords()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getSQLKeywords not supported");
+  }
+
+  @Override
+  public String getNumericFunctions()
+      throws SQLException {
+    return "";
+  }
+
+  @Override
+  public String getStringFunctions()
+      throws SQLException {
+    return "";
+  }
+
+  @Override
+  public String getSystemFunctions()
+      throws SQLException {
+    return "";
+  }
+
+  @Override
+  public String getTimeDateFunctions()
+      throws SQLException {
+    return "";
+  }
+
+  @Override
+  public String getSearchStringEscape()
+      throws SQLException {
+    return "\\";
+  }
+
+  @Override
+  public String getExtraNameCharacters()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getExtraNameCharacters not supported");
+  }
+
+  @Override
+  public String getSchemaTerm()
+      throws SQLException {
+    return "";
+  }
+
+  @Override
+  public String getProcedureTerm()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getProcedureTerm not supported");
+  }
+
+  @Override
+  public String getCatalogTerm()
+      throws SQLException {
+    return "database";
+  }
+
+  @Override
+  public String getCatalogSeparator()
+      throws SQLException {
+    return ".";
+  }
+
+  @Override
+  public int getMaxBinaryLiteralLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxBinaryLiteralLength not supported");
+  }
+
+  @Override
+  public int getMaxCharLiteralLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxCharLiteralLength not supported");
+  }
+
+  @Override
+  public int getMaxColumnNameLength()
+      throws SQLException {
+    return 128;
+  }
+
+  @Override
+  public int getMaxColumnsInGroupBy()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxColumnsInGroupBy not supported");
+  }
+
+  @Override
+  public int getMaxColumnsInIndex()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxColumnsInIndex not supported");
+  }
+
+  @Override
+  public int getMaxColumnsInOrderBy()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxColumnsInOrderBy not supported");
+  }
+
+  @Override
+  public int getMaxColumnsInSelect()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxColumnsInSelect not supported");
+  }
+
+  @Override
+  public int getMaxColumnsInTable()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxColumnsInTable not supported");
+  }
+
+  @Override
+  public int getMaxConnections()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxConnections not supported");
+  }
+
+  @Override
+  public int getMaxCursorNameLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxCursorNameLength not supported");
+  }
+
+  @Override
+  public int getMaxIndexLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxIndexLength not supported");
+  }
+
+  @Override
+  public int getMaxSchemaNameLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxSchemaNameLength not supported");
+  }
+
+  @Override
+  public int getMaxProcedureNameLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxProcedureNameLength not supported");
+  }
+
+  @Override
+  public int getMaxCatalogNameLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxCatalogNameLength not supported");
+  }
+
+  @Override
+  public int getMaxRowSize()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxRowSize not supported");
+  }
+
+  @Override
+  public boolean doesMaxRowSizeIncludeBlobs()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("doesMaxRowSizeIncludeBlobs not supported");
+  }
+
+  @Override
+  public int getMaxStatementLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxStatementLength not supported");
+  }
+
+  @Override
+  public int getMaxStatements()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxStatements not supported");
+  }
+
+  @Override
+  public int getMaxTableNameLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxTableNameLength not supported");
+  }
+
+  @Override
+  public int getMaxTablesInSelect()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxTablesInSelect not supported");
+  }
+
+  @Override
+  public int getMaxUserNameLength()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxUserNameLength not supported");
+  }
+
+  @Override
+  public int getDefaultTransactionIsolation()
+      throws SQLException {
+    return Connection.TRANSACTION_NONE;
+  }
+
+  @Override
+  public boolean dataDefinitionCausesTransactionCommit()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("dataDefinitionCausesTransactionCommit not supported");
+  }
+
+  @Override
+  public boolean dataDefinitionIgnoredInTransactions()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("dataDefinitionIgnoredInTransactions not supported");
+  }
+
+  @Override
+  public ResultSet getProcedures(String catalog, String schemaPattern, String procedureNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("stored procedures not supported");
+  }
+
+  @Override
+  public ResultSet getProcedureColumns(String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("stored procedures not supported");
+  }
+
+  /**
+   * Convert a pattern containing JDBC catalog search wildcards into
+   * Java regex patterns.
+   *
+   * @param pattern input which may contain '%' or '_' wildcard characters, or
+   * these characters escaped using {@link #getSearchStringEscape()}.
+   * @return replace %/_ with regex search characters, also handle escaped
+   * characters.
+   */
+  private String convertPattern(final String pattern) {
+    if (pattern == null) {
+      return ".*";
+    } else {
+      StringBuilder result = new StringBuilder(pattern.length());
+
+      boolean escaped = false;
+      for (int i = 0, len = pattern.length(); i < len; i++) {
+        char c = pattern.charAt(i);
+        if (escaped) {
+          if (c != SEARCH_STRING_ESCAPE) {
+            escaped = false;
+          }
+          result.append(c);
+        } else {
+          if (c == SEARCH_STRING_ESCAPE) {
+            escaped = true;
+            continue;
+          } else if (c == '%') {
+            result.append(".*");
+          } else if (c == '_') {
+            result.append('.');
+          } else {
+            result.append(Character.toLowerCase(c));
+          }
+        }
+      }
+
+      return result.toString();
+    }
+  }
+
+  @Override
+  public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types)
+      throws SQLException {
+    try {
+      final List<MetaDataTuple> resultTables = new ArrayList<MetaDataTuple>();
+      final String resultCatalog;
+      if (catalog == null) {
+        resultCatalog = "default";
+      } else {
+        resultCatalog = catalog;
+      }
+
+      String regtableNamePattern = convertPattern(tableNamePattern);
+      try {
+        TajoClient tajoClient = conn.getTajoClient();
+        List<String> tableNames = tajoClient.getTableList();
+        for (String eachTableName: tableNames) {
+          if (eachTableName.matches(regtableNamePattern)) {
+            MetaDataTuple tuple = new MetaDataTuple(5);
+
+            int index = 0;
+            tuple.put(index++, new TextDatum(resultCatalog));  //TABLE_CAT
+            tuple.put(index++, NullDatum.get());   //TABLE_SCHEM
+            tuple.put(index++, new TextDatum(eachTableName));
+            tuple.put(index++, new TextDatum("TABLE"));   //TABLE_TYPE
+            tuple.put(index++, NullDatum.get());   //REMARKS
+
+            resultTables.add(tuple);
+          }
+        }
+        Collections.sort(resultTables, new Comparator<MetaDataTuple> () {
+          @Override
+          public int compare(MetaDataTuple table1, MetaDataTuple table2) {
+            return table1.getString(2).compareTo(table2.getString(2));
+          }
+        });
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new SQLException(e);
+      }
+      TajoMetaDataResultSet result = new TajoMetaDataResultSet(
+          Arrays.asList("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "TABLE_TYPE", "REMARKS"),
+          Arrays.asList(Type.VARCHAR, Type.VARCHAR, Type.VARCHAR, Type.VARCHAR, Type.VARCHAR),
+          resultTables);
+
+      return result;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new SQLException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public ResultSet getSchemas()
+      throws SQLException {
+    return getSchemas(null, null);
+  }
+
+  @Override
+  public ResultSet getCatalogs()
+      throws SQLException {
+    List<MetaDataTuple> columns = new ArrayList<MetaDataTuple>();
+    MetaDataTuple tuple = new MetaDataTuple(1);
+    tuple.put(0, new TextDatum("default"));
+    columns.add(tuple);
+
+    ResultSet result = new TajoMetaDataResultSet(
+        Arrays.asList("TABLE_CAT")
+        , Arrays.asList(Type.VARCHAR)
+        , columns);
+
+    return result;
+  }
+
+  @Override
+  public ResultSet getTableTypes()
+      throws SQLException {
+    List<MetaDataTuple> columns = new ArrayList<MetaDataTuple>();
+    MetaDataTuple tuple = new MetaDataTuple(1);
+    tuple.put(0, new TextDatum("TABLE"));
+    columns.add(tuple);
+
+    ResultSet result = new TajoMetaDataResultSet(
+        Arrays.asList("TABLE_TYPE")
+        , Arrays.asList(Type.VARCHAR)
+        , columns);
+
+    return result;
+  }
+
+  @Override
+  public ResultSet getUDTs(String catalog, String schemaPattern, String typeNamePattern, int[] types)
+      throws SQLException {
+    List<MetaDataTuple> columns = new ArrayList<MetaDataTuple>();
+
+    return new TajoMetaDataResultSet(
+        Arrays.asList("TYPE_CAT", "TYPE_SCHEM", "TYPE_NAME", "CLASS_NAME", "DATA_TYPE"
+            , "REMARKS", "BASE_TYPE")
+        , Arrays.asList(Type.VARCHAR, Type.VARCHAR, Type.VARCHAR, Type.VARCHAR, Type.INT4, Type.VARCHAR, Type.INT4)
+        , columns);
+  }
+
+  @Override
+  public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
+      throws SQLException {
+    List<MetaDataTuple> columns = new ArrayList<MetaDataTuple>();
+    try {
+      if (catalog == null) {
+        catalog = "default";
+      }
+
+      String regtableNamePattern = convertPattern(tableNamePattern);
+      String regcolumnNamePattern = convertPattern(columnNamePattern);
+
+      List<String> tables = conn.getTajoClient().getTableList();
+      for (String table: tables) {
+        if (table.matches(regtableNamePattern)) {
+          TableDesc tableDesc = conn.getTajoClient().getTableDesc(table);
+          int pos = 0;
+          for (Column column: tableDesc.getSchema().getColumns()) {
+            if (column.getColumnName().matches(regcolumnNamePattern)) {
+              MetaDataTuple tuple = new MetaDataTuple(22);
+
+              int index = 0;
+              tuple.put(index++, new TextDatum(catalog));  //TABLE_CAT
+              tuple.put(index++, NullDatum.get());  //TABLE_SCHEM
+              tuple.put(index++, new TextDatum(table));  //TABLE_NAME
+              tuple.put(index++, new TextDatum(column.getColumnName()));  //COLUMN_NAME
+              tuple.put(index++, new TextDatum("" + TajoDriver.tajoTypeToSqlType(column.getDataType())));  //TODO DATA_TYPE
+              tuple.put(index++, new TextDatum(TajoDriver.toSqlType(column.getDataType())));  //TYPE_NAME
+              tuple.put(index++, new TextDatum("0"));  //COLUMN_SIZE
+              tuple.put(index++, new TextDatum("0"));  //BUFFER_LENGTH
+              tuple.put(index++, new TextDatum("0"));  //DECIMAL_DIGITS
+              tuple.put(index++, new TextDatum("0"));  //NUM_PREC_RADIX
+              tuple.put(index++, new TextDatum("" + DatabaseMetaData.columnNullable));  //NULLABLE
+              tuple.put(index++, NullDatum.get());  //REMARKS
+              tuple.put(index++, NullDatum.get());  //COLUMN_DEF
+              tuple.put(index++, NullDatum.get());  //SQL_DATA_TYPE
+              tuple.put(index++, NullDatum.get());  //SQL_DATETIME_SUB
+              tuple.put(index++, new TextDatum("0"));  //CHAR_OCTET_LENGTH
+              tuple.put(index++, new TextDatum("" + pos));  //ORDINAL_POSITION
+              tuple.put(index++, new TextDatum("YES"));  //IS_NULLABLE
+              tuple.put(index++, NullDatum.get());  //SCOPE_CATLOG
+              tuple.put(index++, NullDatum.get());  //SCOPE_SCHEMA
+              tuple.put(index++, NullDatum.get());  //SCOPE_TABLE
+              tuple.put(index++, new TextDatum("0"));  //SOURCE_DATA_TYPE
+              columns.add(tuple);
+            }
+            pos++;
+          }
+        }
+      }
+
+      return new TajoMetaDataResultSet(
+          Arrays.asList("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "DATA_TYPE"
+              , "TYPE_NAME", "COLUMN_SIZE", "BUFFER_LENGTH", "DECIMAL_DIGITS", "NUM_PREC_RADIX"
+              , "NULLABLE", "REMARKS", "COLUMN_DEF", "SQL_DATA_TYPE", "SQL_DATETIME_SUB"
+              , "CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", "SCOPE_CATLOG", "SCOPE_SCHEMA"
+              , "SCOPE_TABLE", "SOURCE_DATA_TYPE")
+          , Arrays.asList(Type.VARCHAR, Type.VARCHAR, Type.VARCHAR, Type.VARCHAR, Type.INT4
+              , Type.VARCHAR, Type.INT4, Type.INT4, Type.INT4, Type.INT4
+              , Type.INT4, Type.VARCHAR, Type.VARCHAR, Type.INT4, Type.INT4
+              , Type.INT4, Type.INT4, Type.VARCHAR, Type.VARCHAR, Type.VARCHAR
+              , Type.VARCHAR, Type.INT4)
+          , columns);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("privileges not supported");
+  }
+
+  @Override
+  public ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("privileges not supported");
+  }
+
+  @Override
+  public ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("row identifiers not supported");
+  }
+
+  @Override
+  public ResultSet getVersionColumns(String catalog, String schema, String table)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("version columns not supported");
+  }
+
+  @Override
+  public ResultSet getPrimaryKeys(String catalog, String schema, String table)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("primary keys not supported");
+  }
+
+  @Override
+  public ResultSet getImportedKeys(String catalog, String schema, String table)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("imported keys not supported");
+  }
+
+  @Override
+  public ResultSet getExportedKeys(String catalog, String schema, String table)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("exported keys not supported");
+  }
+
+  @Override
+  public ResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable, String foreignCatalog, String foreignSchema, String foreignTable)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("cross reference not supported");
+  }
+
+  @Override
+  public ResultSet getTypeInfo()
+      throws SQLException {
+    throw new UnsupportedOperationException("getTypeInfo not supported");
+  }
+
+  @Override
+  public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("indexes not supported");
+  }
+
+  @Override
+  public boolean deletesAreDetected(int type)
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean insertsAreDetected(int type)
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public Connection getConnection()
+      throws SQLException {
+    return conn;
+  }
+
+  @Override
+  public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("type hierarchies not supported");
+  }
+
+  @Override
+  public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("type hierarchies not supported");
+  }
+
+  @Override
+  public ResultSet getAttributes(String catalog, String schemaPattern, String typeNamePattern, String attributeNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("user-defined types not supported");
+  }
+
+  @Override
+  public int getResultSetHoldability()
+      throws SQLException {
+    return ResultSet.HOLD_CURSORS_OVER_COMMIT;
+  }
+
+  @Override
+  public int getDatabaseMajorVersion()
+      throws SQLException {
+    return TajoDriver.MAJOR_VERSION;
+  }
+
+  @Override
+  public int getDatabaseMinorVersion()
+      throws SQLException {
+    return TajoDriver.MINOR_VERSION;
+  }
+
+  @Override
+  public int getJDBCMajorVersion()
+      throws SQLException {
+    return TajoDriver.JDBC_VERSION_MAJOR;
+  }
+
+  @Override
+  public int getJDBCMinorVersion()
+      throws SQLException {
+    return TajoDriver.JDBC_VERSION_MINOR;
+  }
+
+  @Override
+  public int getSQLStateType()
+      throws SQLException {
+    return DatabaseMetaData.sqlStateSQL;
+  }
+
+  @Override
+  public RowIdLifetime getRowIdLifetime()
+      throws SQLException {
+    return RowIdLifetime.ROWID_UNSUPPORTED;
+  }
+
+  @Override
+  public ResultSet getSchemas(String catalog, String schemaPattern)
+      throws SQLException {
+    return new TajoMetaDataResultSet(
+        Arrays.asList("TABLE_SCHEM", "TABLE_CATALOG"),
+        Arrays.asList(Type.VARCHAR, Type.VARCHAR),
+        null);
+  }
+
+  @Override
+  public boolean autoCommitFailureClosesAllResultSets()
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public ResultSet getClientInfoProperties()
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getClientInfoProperties not supported");
+  }
+
+  @Override
+  public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getFunctions not supported");
+  }
+
+  @Override
+  public ResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("getFunctionColumns not supported");
+  }
+
+  @Override
+  public boolean isCatalogAtStart() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean locatorsUpdateCopy() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean nullPlusNonNullIsNull() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean nullsAreSortedAtEnd() throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean nullsAreSortedAtStart() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean nullsAreSortedHigh() throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean nullsAreSortedLow() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean othersDeletesAreVisible(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean othersInsertsAreVisible(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean othersUpdatesAreVisible(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean ownDeletesAreVisible(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean ownInsertsAreVisible(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean ownUpdatesAreVisible(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean storesLowerCaseIdentifiers() throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean storesLowerCaseQuotedIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean storesMixedCaseIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean storesMixedCaseQuotedIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean storesUpperCaseIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean storesUpperCaseQuotedIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsANSI92EntryLevelSQL() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsANSI92FullSQL() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsANSI92IntermediateSQL() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAlterTableWithAddColumn() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAlterTableWithDropColumn() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsBatchUpdates() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCatalogsInDataManipulation() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCatalogsInIndexDefinitions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCatalogsInProcedureCalls() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCatalogsInTableDefinitions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsColumnAliasing() throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean supportsConvert() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsConvert(int fromType, int toType) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCoreSQLGrammar() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsCorrelatedSubqueries() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsDataDefinitionAndDataManipulationTransactions()
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsDataManipulationTransactionsOnly() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsDifferentTableCorrelationNames() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsExpressionsInOrderBy() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsExtendedSQLGrammar() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsFullOuterJoins() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsGetGeneratedKeys() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsGroupBy() throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean supportsGroupByBeyondSelect() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsGroupByUnrelated() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsIntegrityEnhancementFacility() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsLikeEscapeClause() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsLimitedOuterJoins() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsMinimumSQLGrammar() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsMixedCaseIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsMultipleOpenResults() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsMultipleResultSets() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsMultipleTransactions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsNamedParameters() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsNonNullableColumns() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsOpenCursorsAcrossCommit() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsOpenCursorsAcrossRollback() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsOpenStatementsAcrossCommit() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsOpenStatementsAcrossRollback() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsOrderByUnrelated() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsOuterJoins() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsPositionedDelete() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsPositionedUpdate() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsResultSetConcurrency(int type, int concurrency)
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsResultSetHoldability(int holdability)
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsResultSetType(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSavepoints() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSchemasInDataManipulation() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSchemasInIndexDefinitions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSchemasInProcedureCalls() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSchemasInTableDefinitions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSelectForUpdate() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsStatementPooling() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsStoredProcedures() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSubqueriesInComparisons() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSubqueriesInExists() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSubqueriesInIns() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsSubqueriesInQuantifieds() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsTableCorrelationNames() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsTransactionIsolationLevel(int level)
+      throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsTransactions() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsUnion() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean supportsUnionAll() throws SQLException {
+    return true;
+  }
+
+  @Override
+  public boolean updatesAreDetected(int type) throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean usesLocalFilePerTable() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public boolean usesLocalFiles() throws SQLException {
+    return false;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> T unwrap(Class<T> iface)
+      throws SQLException {
+    if (isWrapperFor(iface)) {
+      return (T) this;
+    }
+    throw new SQLFeatureNotSupportedException("No wrapper for " + iface);
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface)
+      throws SQLException {
+    return iface.isInstance(this);
+  }
+
+  public boolean generatedKeyAlwaysReturned() throws SQLException {
+    // JDK 1.7
+    throw new SQLFeatureNotSupportedException("generatedKeyAlwaysReturned not supported");
+  }
+
+  public ResultSet getPseudoColumns(String catalog, String schemaPattern,
+                                    String tableNamePattern, String columnNamePattern) throws SQLException {
+    // JDK 1.7
+    throw new SQLFeatureNotSupportedException("getPseudoColumns not supported");
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDriver.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
new file mode 100644
index 0000000..ca0502f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.UnsupportedException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.*;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+public class TajoDriver implements Driver, Closeable {
+  public static final int MAJOR_VERSION = 1;
+  public static final int MINOR_VERSION = 0;
+
+  public static final int JDBC_VERSION_MAJOR = 4;
+  public static final int JDBC_VERSION_MINOR = 0;
+
+  public static final String TAJO_JDBC_URL_PREFIX = "jdbc:tajo://";
+
+  protected static TajoConf jdbcTajoConf;
+
+  static {
+    try {
+      java.sql.DriverManager.registerDriver(new TajoDriver());
+    } catch (SQLException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public TajoDriver() {
+    jdbcTajoConf = new TajoConf();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public Connection connect(String url, Properties properties) throws SQLException {
+    return new TajoConnection(url, properties);
+  }
+
+  @Override
+  public boolean acceptsURL(String url) throws SQLException {
+    return url.startsWith(TAJO_JDBC_URL_PREFIX);
+  }
+
+  @Override
+  public DriverPropertyInfo[] getPropertyInfo(String s, Properties properties) throws SQLException {
+    return new DriverPropertyInfo[0];
+  }
+
+  @Override
+  public int getMajorVersion() {
+    return MAJOR_VERSION;
+  }
+
+  @Override
+  public int getMinorVersion() {
+    return MINOR_VERSION;
+  }
+
+  @Override
+  public boolean jdbcCompliant() {
+    return false;
+  }
+
+  public static String toSqlType(TajoDataTypes.DataType type) {
+    switch (type.getType()) {
+      case BOOLEAN:
+        return "boolean";
+      case INT1:
+        return "tinyint";
+      case INT2:
+        return "smallint";
+      case INT4:
+        return "integer";
+      case INT8:
+        return "bigint";
+      case FLOAT4:
+        return "float";
+      case FLOAT8:
+        return "float8";
+      case DECIMAL:
+        return "numeric";
+      case VARBINARY:
+        return "bytea";
+      case CHAR:
+        return "character";
+      case DATE:
+        return "date";
+      case VARCHAR:
+        return "varchar";
+      case TEXT:
+        return "varchar";
+      default:
+        throw new UnsupportedException("Unrecognized column type:" + type);
+    }
+  }
+
+  public static int tajoTypeToSqlType(TajoDataTypes.DataType type) throws SQLException {
+    switch (type.getType()) {
+      case BOOLEAN:
+        return Types.BOOLEAN;
+      case INT1:
+        return Types.TINYINT;
+      case INT2:
+        return Types.SMALLINT;
+      case INT4:
+        return Types.INTEGER;
+      case INT8:
+        return Types.BIGINT;
+      case FLOAT4:
+        return Types.FLOAT;
+      case FLOAT8:
+        return Types.DOUBLE;
+      case DECIMAL:
+        return Types.DECIMAL;
+      case DATE:
+        return Types.TIMESTAMP;
+      case VARCHAR:
+        return Types.VARCHAR;
+      case TEXT:
+        return Types.VARCHAR;
+      default:
+        throw new SQLException("Unrecognized column type: " + type);
+    }
+  }
+
+  static int columnDisplaySize(int columnType) throws SQLException {
+    // according to hiveTypeToSqlType possible options are:
+    switch(columnType) {
+      case Types.BOOLEAN:
+        return columnPrecision(columnType);
+      case Types.VARCHAR:
+        return Integer.MAX_VALUE; // hive has no max limit for strings
+      case Types.TINYINT:
+      case Types.SMALLINT:
+      case Types.INTEGER:
+      case Types.BIGINT:
+        return columnPrecision(columnType) + 1; // allow +/-
+      case Types.TIMESTAMP:
+        return columnPrecision(columnType);
+      // see http://download.oracle.com/javase/6/docs/api/constant-values.html#java.lang.Float.MAX_EXPONENT
+      case Types.FLOAT:
+        return 24; // e.g. -(17#).e-###
+      // see http://download.oracle.com/javase/6/docs/api/constant-values.html#java.lang.Double.MAX_EXPONENT
+      case Types.DOUBLE:
+        return 25; // e.g. -(17#).e-####
+      case Types.DECIMAL:
+        return Integer.MAX_VALUE;
+      default:
+        throw new SQLException("Invalid column type: " + columnType);
+    }
+  }
+
+  static int columnPrecision(int columnType) throws SQLException {
+    // according to hiveTypeToSqlType possible options are:
+    switch(columnType) {
+      case Types.BOOLEAN:
+        return 1;
+      case Types.VARCHAR:
+        return Integer.MAX_VALUE; // hive has no max limit for strings
+      case Types.TINYINT:
+        return 3;
+      case Types.SMALLINT:
+        return 5;
+      case Types.INTEGER:
+        return 10;
+      case Types.BIGINT:
+        return 19;
+      case Types.FLOAT:
+        return 7;
+      case Types.DOUBLE:
+        return 15;
+      case Types.TIMESTAMP:
+        return 29;
+      case Types.DECIMAL:
+        return Integer.MAX_VALUE;
+      default:
+        throw new SQLException("Invalid column type: " + columnType);
+    }
+  }
+
+  static int columnScale(int columnType) throws SQLException {
+    // according to hiveTypeToSqlType possible options are:
+    switch(columnType) {
+      case Types.BOOLEAN:
+      case Types.VARCHAR:
+      case Types.TINYINT:
+      case Types.SMALLINT:
+      case Types.INTEGER:
+      case Types.BIGINT:
+        return 0;
+      case Types.FLOAT:
+        return 7;
+      case Types.DOUBLE:
+        return 15;
+      case Types.TIMESTAMP:
+        return 9;
+      case Types.DECIMAL:
+        return Integer.MAX_VALUE;
+      default:
+        throw new SQLException("Invalid column type: " + columnType);
+    }
+  }
+
+  public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+    // JDK 1.7
+    throw new SQLFeatureNotSupportedException("getParentLogger not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
new file mode 100644
index 0000000..1e75424
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+public class TajoMetaDataResultSet extends TajoResultSetBase {
+  private List<MetaDataTuple> values;
+
+  public TajoMetaDataResultSet(List<String> columns, List<Type> types, List<MetaDataTuple> values) {
+    init();
+    schema = new Schema();
+
+    int index = 0;
+    if(columns != null) {
+      for(String columnName: columns) {
+        schema.addColumn(columnName, types.get(index++));
+      }
+    }
+    this.values = values;
+    totalRow = values == null ? 0 : values.size();
+  }
+
+  @Override
+  protected Tuple nextTuple() throws IOException {
+    if(curRow >= totalRow) {
+      return null;
+    }
+    return values.get(curRow);
+  }
+
+  @Override
+  public void close() throws SQLException {
+  }
+
+  @Override
+  public String getString(int fieldId) throws SQLException {
+    Datum datum = cur.get(fieldId - 1);
+    if(datum == null) {
+      return null;
+    }
+
+    return datum.asChars();
+  }
+
+  @Override
+  public String getString(String name) throws SQLException {
+    Datum datum = cur.get(findColumn(name));
+    if(datum == null) {
+      return null;
+    }
+    return datum.asChars();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
new file mode 100644
index 0000000..a6e3bbf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
@@ -0,0 +1,660 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.client.TajoClient;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.*;
+import java.util.Calendar;
+import java.util.HashMap;
+
+/**
+ * TajoPreparedStatement.
+ *
+ */
+public class TajoPreparedStatement implements PreparedStatement {
+  private final String sql;
+  private TajoClient tajoClient;
+  /**
+   * save the SQL parameters {paramLoc:paramValue}
+   */
+  private final HashMap<Integer, String> parameters=new HashMap<Integer, String>();
+
+  /**
+   * We need to keep a reference to the result set to support the following:
+   * <code>
+   * statement.execute(String sql);
+   * statement.getResultSet();
+   * </code>.
+   */
+  private ResultSet resultSet = null;
+
+  /**
+   * Add SQLWarnings to the warningChain if needed.
+   */
+  private  SQLWarning warningChain = null;
+
+  /**
+   * Keep state so we can fail certain calls made after close().
+   */
+  private boolean isClosed = false;
+
+  /**
+   * keep the current ResultRet update count
+   */
+  private final int updateCount = 0;
+
+  /**
+   *
+   */
+  public TajoPreparedStatement(TajoClient tajoClient,
+                               String sql) {
+    this.tajoClient = tajoClient;
+    this.sql = sql;
+  }
+
+  @Override
+  public void addBatch() throws SQLException {
+    throw new SQLFeatureNotSupportedException("addBatch");
+  }
+
+  @Override
+  public void clearParameters() throws SQLException {
+    this.parameters.clear();
+  }
+
+  @Override
+  public boolean execute() throws SQLException {
+    ResultSet rs = executeImmediate(sql);
+    return rs != null;
+  }
+
+  @Override
+  public ResultSet executeQuery() throws SQLException {
+    return executeImmediate(sql);
+  }
+
+  @Override
+  public int executeUpdate() throws SQLException {
+    executeImmediate(sql);
+    return updateCount;
+  }
+
+  protected ResultSet executeImmediate(String sql) throws SQLException {
+    if (isClosed) {
+      throw new SQLFeatureNotSupportedException("Can't execute after statement has been closed");
+    }
+
+    try {
+      if (sql.contains("?")) {
+        sql = updateSql(sql, parameters);
+      }
+      resultSet = tajoClient.executeQueryAndGetResult(sql);
+    } catch (Exception e) {
+      throw new SQLFeatureNotSupportedException(e.getMessage(), e);
+    }
+    return resultSet;
+  }
+
+  /**
+   * update the SQL string with parameters set by setXXX methods of {@link java.sql.PreparedStatement}
+   *
+   * @param sql
+   * @param parameters
+   * @return updated SQL string
+   */
+  private String updateSql(final String sql, HashMap<Integer, String> parameters) {
+
+    StringBuffer newSql = new StringBuffer(sql);
+
+    int paramLoc = 1;
+    while (getCharIndexFromSqlByParamLocation(sql, '?', paramLoc) > 0) {
+      // check the user has set the needs parameters
+      if (parameters.containsKey(paramLoc)) {
+        int tt = getCharIndexFromSqlByParamLocation(newSql.toString(), '?', 1);
+        newSql.deleteCharAt(tt);
+        newSql.insert(tt, parameters.get(paramLoc));
+      }
+      paramLoc++;
+    }
+
+    return newSql.toString();
+
+  }
+
+  /**
+   * Get the index of given char from the SQL string by parameter location
+   * </br> The -1 will be return, if nothing found
+   *
+   * @param sql
+   * @param cchar
+   * @param paramLoc
+   * @return
+   */
+  private int getCharIndexFromSqlByParamLocation(final String sql, final char cchar, final int paramLoc) {
+    int signalCount = 0;
+    int charIndex = -1;
+    int num = 0;
+    for (int i = 0; i < sql.length(); i++) {
+      char c = sql.charAt(i);
+      if (c == '\'' || c == '\\')// record the count of char "'" and char "\"
+      {
+        signalCount++;
+      } else if (c == cchar && signalCount % 2 == 0) {// check if the ? is really the parameter
+        num++;
+        if (num == paramLoc) {
+          charIndex = i;
+          break;
+        }
+      }
+    }
+    return charIndex;
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    if(resultSet != null) {
+      return resultSet.getMetaData();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public ParameterMetaData getParameterMetaData() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getParameterMetaData not supported");
+  }
+
+  @Override
+  public void setArray(int i, Array x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setArray not supported");
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setAsciiStream not supported");
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setAsciiStream not supported");
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setAsciiStream not supported");
+  }
+
+  @Override
+  public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBigDecimal not supported");
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBinaryStream not supported");
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBinaryStream not supported");
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBinaryStream not supported");
+  }
+
+  @Override
+  public void setBlob(int i, Blob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBlob not supported");
+  }
+
+  @Override
+  public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBlob not supported");
+  }
+
+  @Override
+  public void setBlob(int parameterIndex, InputStream inputStream, long length)
+          throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBlob not supported");
+  }
+
+  @Override
+  public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+    this.parameters.put(parameterIndex, "" + x);
+  }
+
+  @Override
+  public void setByte(int parameterIndex, byte x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setByte not supported");
+  }
+
+  @Override
+  public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setBytes not supported");
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setCharacterStream not supported");
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setCharacterStream not supported");
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setCharacterStream not supported");
+  }
+
+  @Override
+  public void setClob(int i, Clob x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setClob not supported");
+  }
+
+  @Override
+  public void setClob(int parameterIndex, Reader reader) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setClob not supported");
+  }
+
+  @Override
+  public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setClob not supported");
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setDate not supported");
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setDate not supported");
+  }
+
+  @Override
+  public void setDouble(int parameterIndex, double x) throws SQLException {
+    this.parameters.put(parameterIndex,"" + x);
+  }
+
+  @Override
+  public void setFloat(int parameterIndex, float x) throws SQLException {
+    this.parameters.put(parameterIndex,"" + x);
+  }
+
+  @Override
+  public void setInt(int parameterIndex, int x) throws SQLException {
+    this.parameters.put(parameterIndex,"" + x);
+  }
+
+  @Override
+  public void setLong(int parameterIndex, long x) throws SQLException {
+    this.parameters.put(parameterIndex,"" + x);
+  }
+
+  @Override
+  public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNCharacterStream not supported");
+  }
+
+  @Override
+  public void setNCharacterStream(int parameterIndex, Reader value, long length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNCharacterStream not supported");
+  }
+
+  @Override
+  public void setNClob(int parameterIndex, NClob value) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNClob not supported");
+  }
+
+  @Override
+  public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNClob not supported");
+  }
+
+  @Override
+  public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNClob not supported");
+  }
+
+  @Override
+  public void setNString(int parameterIndex, String value) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNString not supported");
+  }
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNull not supported");
+  }
+
+  @Override
+  public void setNull(int paramIndex, int sqlType, String typeName) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setNull not supported");
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setObject not supported");
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x, int targetSqlType)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setObject not supported");
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x, int targetSqlType, int scale)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setObject not supported");
+  }
+
+  @Override
+  public void setRef(int i, Ref x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setRef not supported");
+  }
+
+  @Override
+  public void setRowId(int parameterIndex, RowId x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setRowId not supported");
+  }
+
+  @Override
+  public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setSQLXML not supported");
+  }
+
+  @Override
+  public void setShort(int parameterIndex, short x) throws SQLException {
+    this.parameters.put(parameterIndex,"" + x);
+  }
+
+  @Override
+  public void setString(int parameterIndex, String x) throws SQLException {
+     x=x.replace("'", "\\'");
+     this.parameters.put(parameterIndex,"'" + x +"'");
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setTime not supported");
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setTime not supported");
+  }
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setTimestamp not supported");
+  }
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setTimestamp not supported");
+  }
+
+  @Override
+  public void setURL(int parameterIndex, URL x) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setURL not supported");
+  }
+
+  @Override
+  public void setUnicodeStream(int parameterIndex, InputStream x, int length)
+      throws SQLException {
+    throw new SQLFeatureNotSupportedException("setUnicodeStream not supported");
+  }
+
+  @Override
+  public void addBatch(String sql) throws SQLException {
+    throw new SQLFeatureNotSupportedException("addBatch not supported");
+  }
+
+  @Override
+  public void cancel() throws SQLException {
+    throw new SQLFeatureNotSupportedException("cancel not supported");
+  }
+
+  @Override
+  public void clearBatch() throws SQLException {
+    throw new SQLFeatureNotSupportedException("clearBatch not supported");
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+     warningChain=null;
+  }
+
+  public void closeOnCompletion() throws SQLException {
+    // JDK 1.7
+    throw new SQLFeatureNotSupportedException("closeOnCompletion");
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (resultSet!=null) {
+      resultSet.close();
+      resultSet = null;
+    }
+    isClosed = true;
+  }
+
+  @Override
+  public boolean execute(String sql) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
+  }
+
+  @Override
+  public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
+  }
+
+  @Override
+  public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
+  }
+
+  @Override
+  public boolean execute(String sql, String[] columnNames) throws SQLException {
+    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
+  }
+
+  @Override
+  public int[] executeBatch() throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeBatch not supported");
+  }
+
+  @Override
+  public ResultSet executeQuery(String sql) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeQuery(sql) not supported");
+  }
+
+  @Override
+  public int executeUpdate(String sql) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getConnection not supported");
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getFetchDirection not supported");
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getFetchSize not supported");
+  }
+
+  @Override
+  public ResultSet getGeneratedKeys() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported");
+  }
+
+  @Override
+  public int getMaxFieldSize() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported");
+  }
+
+  @Override
+  public int getMaxRows() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMaxRows not supported");
+  }
+
+  @Override
+  public boolean getMoreResults() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+  }
+
+  @Override
+  public boolean getMoreResults(int current) throws SQLException {
+    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+  }
+
+  @Override
+  public int getQueryTimeout() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getQueryTimeout not supported");
+  }
+
+  @Override
+  public ResultSet getResultSet() throws SQLException {
+    return this.resultSet;
+  }
+
+  @Override
+  public int getResultSetConcurrency() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported");
+  }
+
+  @Override
+  public int getResultSetHoldability() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported");
+  }
+
+  @Override
+  public int getResultSetType() throws SQLException {
+    throw new SQLFeatureNotSupportedException("getResultSetType not supported");
+  }
+
+  @Override
+  public int getUpdateCount() throws SQLException {
+    return updateCount;
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return warningChain;
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return isClosed;
+  }
+
+   public boolean isCloseOnCompletion() throws SQLException {
+     //JDK 1.7
+     throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported");
+   }
+
+  @Override
+  public boolean isPoolable() throws SQLException {
+    throw new SQLFeatureNotSupportedException("isPoolable not supported");
+  }
+
+  @Override
+  public void setCursorName(String name) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setCursorName not supported");
+  }
+
+  @Override
+  public void setEscapeProcessing(boolean enable) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setEscapeProcessing not supported");
+  }
+
+  @Override
+  public void setFetchDirection(int direction) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
+  }
+
+  @Override
+  public void setFetchSize(int rows) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setFetchSize not supported");
+  }
+
+  @Override
+  public void setMaxFieldSize(int max) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported");
+  }
+
+  @Override
+  public void setMaxRows(int max) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setMaxRows not supported");
+  }
+
+  @Override
+  public void setPoolable(boolean poolable) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setPoolable not supported");
+  }
+
+  @Override
+  public void setQueryTimeout(int seconds) throws SQLException {
+    throw new SQLFeatureNotSupportedException("setQueryTimeout not supported");
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    throw new SQLFeatureNotSupportedException("unwrap not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/342fd47f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
new file mode 100644
index 0000000..a50ed2c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.storage.MergeScanner;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+
+public class TajoResultSet extends TajoResultSetBase {
+  private FileSystem fs;
+  private Scanner scanner;
+  private TajoClient tajoClient;
+  QueryId queryId;
+
+  public TajoResultSet(TajoClient tajoClient, QueryId queryId) {
+    this.tajoClient = tajoClient;
+    this.queryId = queryId;
+    init();
+  }
+
+  public TajoResultSet(TajoClient tajoClient, QueryId queryId,
+                       Configuration conf, TableDesc desc) throws IOException {
+    this.schema = desc.getSchema();
+    this.tajoClient = tajoClient;
+    this.queryId = queryId;
+    if(desc != null) {
+      fs = desc.getPath().getFileSystem(conf);
+      this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : 0;
+
+      Collection<FileFragment> frags = getFragments(desc.getMeta(), desc.getPath());
+      scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
+    }
+    init();
+  }
+
+  @Override
+  protected void init() {
+    cur = null;
+    curRow = 0;
+  }
+
+  class FileNameComparator implements Comparator<FileStatus> {
+
+    @Override
+    public int compare(FileStatus f1, FileStatus f2) {
+      return f2.getPath().getName().compareTo(f1.getPath().getName());
+    }
+  }
+
+  private Collection<FileFragment> getFragments(TableMeta meta, Path tablePath)
+      throws IOException {
+    List<FileFragment> fraglist = Lists.newArrayList();
+    FileStatus[] files = fs.listStatus(tablePath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().charAt(0) != '.';
+      }
+    });
+    Arrays.sort(files, new FileNameComparator());
+
+    String tbname = tablePath.getName();
+    for (int i = 0; i < files.length; i++) {
+      if (files[i].getLen() == 0) {
+        continue;
+      }
+      fraglist.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen()));
+    }
+    return fraglist;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    try {
+      if(tajoClient != null) {
+        this.tajoClient.closeQuery(queryId);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    try {
+      if(scanner != null) {
+        this.scanner.close();
+      }
+      //TODO clean temp result file
+      cur = null;
+      curRow = -1;
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void beforeFirst() throws SQLException {
+    try {
+      if(scanner != null) {
+        scanner.reset();
+      }
+      init();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+
+  @Override
+  protected Tuple nextTuple() throws IOException {
+    if(scanner == null) {
+      return null;
+    }
+    return scanner.next();
+  }
+
+  public boolean hasResult() {
+    return scanner != null;
+  }
+}