You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:48 UTC
[47/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/src/main/antlr3/PhoenixSQL.g b/src/main/antlr3/PhoenixSQL.g
new file mode 100644
index 0000000..117ab42
--- /dev/null
+++ b/src/main/antlr3/PhoenixSQL.g
@@ -0,0 +1,1102 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+grammar PhoenixSQL;
+
+tokens
+{
+ SELECT='select';
+ FROM='from';
+ WHERE='where';
+ NOT='not';
+ AND='and';
+ OR='or';
+ NULL='null';
+ TRUE='true';
+ FALSE='false';
+ LIKE='like';
+ AS='as';
+ OUTER='outer';
+ ON='on';
+ IN='in';
+ GROUP='group';
+ HAVING='having';
+ ORDER='order';
+ BY='by';
+ ASC='asc';
+ DESC='desc';
+ NULLS='nulls';
+ LIMIT='limit';
+ FIRST='first';
+ LAST='last';
+ CASE='case';
+ WHEN='when';
+ THEN='then';
+ ELSE='else';
+ END='end';
+ EXISTS='exists';
+ IS='is';
+ FIRST='first';
+ DISTINCT='distinct';
+ JOIN='join';
+ INNER='inner';
+ LEFT='left';
+ RIGHT='right';
+ FULL='full';
+ BETWEEN='between';
+ UPSERT='upsert';
+ INTO='into';
+ VALUES='values';
+ DELETE='delete';
+ CREATE='create';
+ DROP='drop';
+ PRIMARY='primary';
+ KEY='key';
+ ALTER='alter';
+ COLUMN='column';
+ TABLE='table';
+ ADD='add';
+ SPLIT='split';
+ EXPLAIN='explain';
+ VIEW='view';
+ IF='if';
+ CONSTRAINT='constraint';
+ SHOW='show';
+ TABLES='tables';
+ ALL='all';
+ INDEX='index';
+ INCLUDE='include';
+ WITHIN='within';
+ SET='set';
+ CAST='cast';
+ USABLE='usable';
+ UNUSABLE='unusable';
+ DISABLE='disable';
+ REBUILD='rebuild';
+}
+
+
+@parser::header {
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+///CLOVER:OFF
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.hbase.util.Pair;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Stack;
+import java.sql.SQLException;
+import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.SchemaUtil;
+}
+
+@lexer::header {
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+///CLOVER:OFF
+}
+
+// --------------------------------------
+// The Parser
+
+@parser::members
+{
+
+ /**
+ * used to turn '?' binds into : binds.
+ */
+ private int anonBindNum;
+ private ParseNodeFactory factory;
+ private ParseContext.Stack contextStack = new ParseContext.Stack();
+
+ public void setParseNodeFactory(ParseNodeFactory factory) {
+ this.factory = factory;
+ }
+
+ public boolean isCountFunction(String field) {
+ return CountAggregateFunction.NORMALIZED_NAME.equals(SchemaUtil.normalizeIdentifier(field));
+ }
+
+ public int line(Token t) {
+ return t.getLine();
+ }
+
+ public int column(Token t) {
+ return t.getCharPositionInLine() + 1;
+ }
+
+ private void throwRecognitionException(Token t) throws RecognitionException {
+ RecognitionException e = new RecognitionException();
+ e.token = t;
+ e.line = t.getLine();
+ e.charPositionInLine = t.getCharPositionInLine();
+ e.input = input;
+ throw e;
+ }
+
+ public int getBindCount() {
+ return anonBindNum;
+ }
+
+ public void resetBindCount() {
+ anonBindNum = 0;
+ }
+
+ public String nextBind() {
+ return Integer.toString(++anonBindNum);
+ }
+
+ public void updateBind(String namedBind){
+ int nBind = Integer.parseInt(namedBind);
+ if (nBind > anonBindNum) {
+ anonBindNum = nBind;
+ }
+ }
+
+ protected Object recoverFromMismatchedToken(IntStream input, int ttype, BitSet follow)
+ throws RecognitionException {
+ RecognitionException e = null;
+ // if next token is what we are looking for then "delete" this token
+ if (mismatchIsUnwantedToken(input, ttype)) {
+ e = new UnwantedTokenException(ttype, input);
+ } else if (mismatchIsMissingToken(input, follow)) {
+ Object inserted = getMissingSymbol(input, e, ttype, follow);
+ e = new MissingTokenException(ttype, input, inserted);
+ } else {
+ e = new MismatchedTokenException(ttype, input);
+ }
+ throw e;
+ }
+
+ public Object recoverFromMismatchedSet(IntStream input, RecognitionException e, BitSet follow)
+ throws RecognitionException
+ {
+ throw e;
+ }
+
+ @Override
+ public String getErrorMessage(RecognitionException e, String[] tokenNames) {
+ if (e instanceof MismatchedTokenException) {
+ MismatchedTokenException mte = (MismatchedTokenException)e;
+ String txt = mte.token.getText();
+ String p = mte.token.getType() == -1 ? "EOF" : PARAPHRASE[mte.token.getType()];
+ String expecting = (mte.expecting < PARAPHRASE.length && mte.expecting >= 0) ? PARAPHRASE[mte.expecting] : null;
+ if (expecting == null) {
+ return "unexpected token (" + line(mte.token) + "," + column(mte.token) + "): " + (txt != null ? txt : p);
+ } else {
+ return "expecting " + expecting +
+ ", found '" + (txt != null ? txt : p) + "'";
+ }
+ } else if (e instanceof NoViableAltException) {
+ //NoViableAltException nvae = (NoViableAltException)e;
+ return "unexpected token: (" + line(e.token) + "," + column(e.token) + ")" + getTokenErrorDisplay(e.token);
+ }
+ return super.getErrorMessage(e, tokenNames);
+ }
+
+ public String getTokenErrorDisplay(int t) {
+ String ret = PARAPHRASE[t];
+ if (ret == null) ret = "<UNKNOWN>";
+ return ret;
+ }
+
+
+ private String[] PARAPHRASE = new String[getTokenNames().length];
+ {
+ PARAPHRASE[NAME] = "a field or entity name";
+ PARAPHRASE[NUMBER] = "a number";
+ PARAPHRASE[EQ] = "an equals sign";
+ PARAPHRASE[LT] = "a left angle bracket";
+ PARAPHRASE[GT] = "a right angle bracket";
+ PARAPHRASE[COMMA] = "a comma";
+ PARAPHRASE[LPAREN] = "a left parentheses";
+ PARAPHRASE[RPAREN] = "a right parentheses";
+ PARAPHRASE[SEMICOLON] = "a semi-colon";
+ PARAPHRASE[COLON] = "a colon";
+ PARAPHRASE[LSQUARE] = "left square bracket";
+ PARAPHRASE[RSQUARE] = "right square bracket";
+ PARAPHRASE[LCURLY] = "left curly bracket";
+ PARAPHRASE[RCURLY] = "right curly bracket";
+ PARAPHRASE[AT] = "at";
+ PARAPHRASE[MINUS] = "a subtraction";
+ PARAPHRASE[TILDE] = "a tilde";
+ PARAPHRASE[PLUS] = "an addition";
+ PARAPHRASE[ASTERISK] = "an asterisk";
+ PARAPHRASE[DIVIDE] = "a division";
+ PARAPHRASE[FIELDCHAR] = "a field character";
+ PARAPHRASE[LETTER] = "an ansi letter";
+ PARAPHRASE[POSINTEGER] = "a positive integer";
+ PARAPHRASE[DIGIT] = "a number from 0 to 9";
+ }
+}
+
+@rulecatch {
+ catch (RecognitionException re) {
+ throw re;
+ }
+}
+
+@lexer::members {
+
+}
+
+// Used to incrementally parse a series of semicolon-terminated SQL statement
+// Note than unlike the rule below an EOF is not expected at the end.
+nextStatement returns [BindableStatement ret]
+ : s=oneStatement {$ret = s;} SEMICOLON
+ | EOF
+ ;
+
+// Parses a single SQL statement (expects an EOF after the select statement).
+statement returns [BindableStatement ret]
+ : s=oneStatement {$ret = s;} EOF
+ ;
+
+// Parses a select statement which must be the only statement (expects an EOF after the statement).
+query returns [SelectStatement ret]
+ : SELECT s=hinted_select_node EOF {$ret=s;}
+ ;
+
+// Parses a single SQL statement (expects an EOF after the select statement).
+oneStatement returns [BindableStatement ret]
+ : (SELECT s=hinted_select_node {$ret=s;}
+ | ns=non_select_node {$ret=ns;}
+ )
+ ;
+
+non_select_node returns [BindableStatement ret]
+@init{ contextStack.push(new ParseContext()); }
+ : (s=upsert_node
+ | s=delete_node
+ | s=create_table_node
+ | s=create_index_node
+ | s=drop_table_node
+ | s=drop_index_node
+ | s=alter_index_node
+ | s=alter_table_node
+ | s=explain_node
+ | s=show_tables_node) { contextStack.pop(); $ret = s; }
+ ;
+
+show_tables_node returns [BindableStatement ret]
+ : SHOW TABLES {$ret=factory.showTables();}
+ ;
+
+explain_node returns [BindableStatement ret]
+ : EXPLAIN q=oneStatement {$ret=factory.explain(q);}
+ ;
+
+// Parse a create table statement.
+create_table_node returns [CreateTableStatement ret]
+ : CREATE (tt=VIEW | TABLE) (IF NOT ex=EXISTS)? t=from_table_name
+ (LPAREN cdefs=column_defs (pk=pk_constraint)? RPAREN)
+ (p=fam_properties)?
+ (SPLIT ON v=list_expressions)?
+ {ret = factory.createTable(t, p, cdefs, pk, v, tt!=null ? PTableType.VIEW : PTableType.USER, ex!=null, getBindCount()); }
+ ;
+
+// Parse a create index statement.
+create_index_node returns [CreateIndexStatement ret]
+ : CREATE INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
+ (LPAREN pk=index_pk_constraint RPAREN)
+ (INCLUDE (LPAREN icrefs=column_names RPAREN))?
+ (p=fam_properties)?
+ (SPLIT ON v=list_expressions)?
+ {ret = factory.createIndex(i, factory.namedTable(null,t), pk, icrefs, v, p, ex!=null, getBindCount()); }
+ ;
+
+pk_constraint returns [PrimaryKeyConstraint ret]
+ : CONSTRAINT n=identifier PRIMARY KEY LPAREN cols=col_name_with_mod_list RPAREN { $ret = factory.primaryKey(n,cols); }
+ ;
+
+col_name_with_mod_list returns [List<Pair<ColumnName, ColumnModifier>> ret]
+@init{ret = new ArrayList<Pair<ColumnName, ColumnModifier>>(); }
+ : p=col_name_with_mod {$ret.add(p);} (COMMA p = col_name_with_mod {$ret.add(p);} )*
+;
+
+col_name_with_mod returns [Pair<ColumnName, ColumnModifier> ret]
+ : f=identifier (order=ASC|order=DESC)? {$ret = Pair.newPair(factory.columnName(f), order == null ? null : ColumnModifier.fromDDLValue(order.getText()));}
+;
+
+index_pk_constraint returns [PrimaryKeyConstraint ret]
+ : cols = col_def_name_with_mod_list {$ret = factory.primaryKey(null, cols); }
+ ;
+
+col_def_name_with_mod_list returns [List<Pair<ColumnName, ColumnModifier>> ret]
+@init{ret = new ArrayList<Pair<ColumnName, ColumnModifier>>(); }
+ : p=col_def_name_with_mod {$ret.add(p);} (COMMA p = col_def_name_with_mod {$ret.add(p);} )*
+;
+
+col_def_name_with_mod returns [Pair<ColumnName, ColumnModifier> ret]
+ : c=column_name (order=ASC|order=DESC)? {$ret = Pair.newPair(c, order == null ? null : ColumnModifier.fromDDLValue(order.getText()));}
+;
+
+fam_properties returns [ListMultimap<String,Pair<String,Object>> ret]
+@init{ret = ArrayListMultimap.<String,Pair<String,Object>>create(); }
+ : p=fam_prop_name EQ v=prop_value {$ret.put(p.getFamilyName(),new Pair<String,Object>(p.getPropertyName(),v));} (COMMA p=fam_prop_name EQ v=prop_value {$ret.put(p.getFamilyName(),new Pair<String,Object>(p.getPropertyName(),v));} )*
+ ;
+
+fam_prop_name returns [PropertyName ret]
+ : propName=identifier {$ret = factory.propertyName(propName); }
+ | familyName=identifier DOT propName=identifier {$ret = factory.propertyName(familyName, propName); }
+ ;
+
+prop_value returns [Object ret]
+ : l=literal { $ret = l.getValue(); }
+ ;
+
+column_name returns [ColumnName ret]
+ : field=identifier {$ret = factory.columnName(field); }
+ | family=identifier DOT field=identifier {$ret = factory.columnName(family, field); }
+ ;
+
+column_names returns [List<ColumnName> ret]
+@init{ret = new ArrayList<ColumnName>(); }
+ : v = column_name {$ret.add(v);} (COMMA v = column_name {$ret.add(v);} )*
+;
+
+
+// Parse a drop table statement.
+drop_table_node returns [DropTableStatement ret]
+ : DROP (v=VIEW | TABLE) (IF ex=EXISTS)? t=from_table_name
+ {ret = factory.dropTable(t, v==null ? PTableType.USER : PTableType.VIEW, ex!=null); }
+ ;
+
+// Parse a drop index statement
+drop_index_node returns [DropIndexStatement ret]
+ : DROP INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name
+ {ret = factory.dropIndex(i, t, ex!=null); }
+ ;
+
+// Parse a alter index statement
+alter_index_node returns [AlterIndexStatement ret]
+ : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE)
+ {ret = factory.alterIndex(factory.namedTable(null,factory.table(t.getSchemaName(),i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
+ ;
+
+// Parse an alter table statement.
+alter_table_node returns [AlterTableStatement ret]
+ : ALTER TABLE t=from_table_name
+ ( (DROP COLUMN (IF ex=EXISTS)? c=column_names) | (ADD (IF NOT ex=EXISTS)? (d=column_defs) (p=properties)?) | (SET (p=properties)) )
+ {ret = ( c == null ? factory.addColumn(factory.namedTable(null,t), d, ex!=null, p) : factory.dropColumn(factory.namedTable(null,t), c, ex!=null) ); }
+ ;
+
+prop_name returns [String ret]
+ : p=identifier {$ret = SchemaUtil.normalizeIdentifier(p); }
+ ;
+
+properties returns [Map<String,Object> ret]
+@init{ret = new HashMap<String,Object>(); }
+ : k=prop_name EQ v=prop_value {$ret.put(k,v);} (COMMA k=prop_name EQ v=prop_value {$ret.put(k,v);} )*
+ ;
+
+column_defs returns [List<ColumnDef> ret]
+@init{ret = new ArrayList<ColumnDef>(); }
+ : v = column_def {$ret.add(v);} (COMMA v = column_def {$ret.add(v);} )*
+;
+
+column_def returns [ColumnDef ret]
+ : c=column_name dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? (n=NOT? NULL)? (pk=PRIMARY KEY (order=ASC|order=DESC)?)?
+ { $ret = factory.columnDef(c, dt, n==null,
+ l == null ? null : Integer.parseInt( l.getText() ),
+ s == null ? null : Integer.parseInt( s.getText() ),
+ pk != null,
+ order == null ? null : ColumnModifier.fromDDLValue(order.getText()) ); }
+ ;
+
+dyn_column_defs returns [List<ColumnDef> ret]
+@init{ret = new ArrayList<ColumnDef>(); }
+ : v = dyn_column_def {$ret.add(v);} (COMMA v = dyn_column_def {$ret.add(v);} )*
+;
+
+dyn_column_def returns [ColumnDef ret]
+ : c=column_name dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)?
+ {$ret = factory.columnDef(c, dt, true,
+ l == null ? null : Integer.parseInt( l.getText() ),
+ s == null ? null : Integer.parseInt( s.getText() ),
+ false,
+ null); }
+ ;
+
+dyn_column_name_or_def returns [ColumnDef ret]
+ : c=column_name (dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? )?
+ {$ret = factory.columnDef(c, dt, true,
+ l == null ? null : Integer.parseInt( l.getText() ),
+ s == null ? null : Integer.parseInt( s.getText() ),
+ false,
+ null); }
+ ;
+
+select_expression returns [SelectStatement ret]
+ : SELECT s=select_node {$ret = s;}
+ ;
+
+subquery_expression returns [ParseNode ret]
+ : s=select_expression {$ret = factory.subquery(s);}
+ ;
+
+// Parse a full select expression structure.
+select_node returns [SelectStatement ret]
+@init{ contextStack.push(new ParseContext()); }
+ : (d=DISTINCT | ALL)? sel=select_list
+ FROM from=parseFrom
+ (WHERE where=condition)?
+ (GROUP BY group=group_by)?
+ (HAVING having=condition)?
+ (ORDER BY order=order_by)?
+ (LIMIT l=limit)?
+ { ParseContext context = contextStack.pop(); $ret = factory.select(from, null, d!=null, sel, where, group, having, order, l, getBindCount(), context.isAggregate()); }
+ ;
+
+// Parse a full select expression structure.
+hinted_select_node returns [SelectStatement ret]
+@init{ contextStack.push(new ParseContext()); }
+ : (hint=hintClause)?
+ s=select_node
+ { $ret = factory.select(s, hint); }
+ ;
+
+// Parse a full upsert expression structure.
+upsert_node returns [UpsertStatement ret]
+ : UPSERT (hint=hintClause)? INTO t=from_table_name
+ (LPAREN p=upsert_column_refs RPAREN)?
+ ((VALUES LPAREN v=expression_terms RPAREN) | s=select_expression)
+ {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount()); }
+ ;
+
+upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
+@init{ret = new Pair<List<ColumnDef>,List<ColumnName>>(new ArrayList<ColumnDef>(), new ArrayList<ColumnName>()); }
+ : d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); }
+ (COMMA d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } )*
+;
+
+// Parse a full delete expression structure.
+delete_node returns [DeleteStatement ret]
+ : DELETE (hint=hintClause)? FROM t=from_table_name
+ (WHERE v=condition)?
+ (ORDER BY order=order_by)?
+ (LIMIT l=limit)?
+ {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount()); }
+ ;
+
+limit returns [LimitNode ret]
+ : b=bind_expression { $ret = factory.limit(b); }
+ | l=int_literal { $ret = factory.limit(l); }
+ ;
+
+hintClause returns [HintNode ret]
+ : c=ML_HINT { $ret = factory.hint(c.getText()); }
+ ;
+
+// Parse the column/expression select list part of a select.
+select_list returns [List<AliasedNode> ret]
+@init{ret = new ArrayList<AliasedNode>();}
+ : n=selectable {ret.add(n);} (COMMA n=selectable {ret.add(n);})*
+ | ASTERISK { $ret = Collections.<AliasedNode>singletonList(factory.aliasedNode(null, factory.wildcard()));} // i.e. the '*' in 'select * from'
+ ;
+
+// Parse either a select field or a sub select.
+selectable returns [AliasedNode ret]
+ : field=expression (a=parseAlias)? { $ret = factory.aliasedNode(a == null ? field.getAlias() : a, field); }
+ | familyName=identifier DOT ASTERISK { $ret = factory.aliasedNode(null, factory.family(familyName));} // i.e. the 'cf.*' in 'select cf.* from' cf being column family of an hbase table
+ ;
+
+
+// Parse a group by statement
+group_by returns [List<ParseNode> ret]
+@init{ret = new ArrayList<ParseNode>();}
+ : expr=expression { ret.add(expr); }
+ (COMMA expr = expression {ret.add(expr); })*
+ ;
+
+// Parse an order by statement
+order_by returns [List<OrderByNode> ret]
+@init{ret = new ArrayList<OrderByNode>();}
+ : field=parseOrderByField { ret.add(field); }
+ (COMMA field = parseOrderByField {ret.add(field); })*
+ ;
+
+//parse the individual field for an order by clause
+parseOrderByField returns [OrderByNode ret]
+@init{boolean isAscending = true; boolean nullsLast = false;}
+ : (expr = expression)
+ (ASC {isAscending = true;} | DESC {isAscending = false;})?
+ (NULLS (FIRST {nullsLast = false;} | LAST {nullsLast = true;}))?
+ { $ret = factory.orderBy(expr, nullsLast, isAscending); }
+ ;
+
+parseFrom returns [List<TableNode> ret]
+ : l=table_refs { $ret = l; }
+ | l=join_specs { $ret = l; }
+ ;
+
+table_refs returns [List<TableNode> ret]
+@init{ret = new ArrayList<TableNode>(4); }
+ : t=table_ref {$ret.add(t);}
+ (COMMA t=table_ref {$ret.add(t);} )*
+ ;
+
+// parse a field, if it might be a bind name.
+named_table returns [NamedTableNode ret]
+ : t=from_table_name (LPAREN cdefs=dyn_column_defs RPAREN)? { $ret = factory.namedTable(null,t,cdefs); }
+ ;
+
+table_ref returns [TableNode ret]
+ : n=bind_name ((AS)? alias=identifier)? { $ret = factory.bindTable(alias, factory.table(null,n)); } // TODO: review
+ | t=from_table_name ((AS)? alias=identifier)? (LPAREN cdefs=dyn_column_defs RPAREN)? { $ret = factory.namedTable(alias,t,cdefs); }
+ | LPAREN SELECT s=hinted_select_node RPAREN ((AS)? alias=identifier)? { $ret = factory.derivedTable(alias, s); }
+ ;
+
+join_specs returns [List<TableNode> ret]
+ : t=named_table {$ret.add(t);} (s=join_spec { $ret.add(s); })+
+ ;
+
+join_spec returns [JoinTableNode ret]
+ : j=join_type JOIN t=named_table ON e=condition { $ret = factory.join(null, t, e, j); }
+ ;
+
+join_type returns [JoinTableNode.JoinType ret]
+ : INNER { $ret = JoinTableNode.JoinType.Inner; }
+ | LEFT OUTER? { $ret = JoinTableNode.JoinType.Left; }
+ | RIGHT OUTER? { $ret = JoinTableNode.JoinType.Right; }
+ | FULL OUTER? { $ret = JoinTableNode.JoinType.Full; }
+ ;
+
+parseAlias returns [String ret]
+ : AS? alias=parseNoReserved { $ret = alias; }
+ ;
+
+// Parse a condition, such as used in a where clause - either a basic one, or an OR of (Single or AND) expressions
+condition returns [ParseNode ret]
+ : e=condition_or { $ret = e; }
+ ;
+
+// A set of OR'd simple expressions
+condition_or returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=condition_and {l.add(i);} (OR i=condition_and {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.or(l); }
+ ;
+
+// A set of AND'd simple expressions
+condition_and returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=condition_not {l.add(i);} (AND i=condition_not {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.and(l); }
+ ;
+
+// NOT or parenthesis
+condition_not returns [ParseNode ret]
+ : (NOT? boolean_expr ) => n=NOT? e=boolean_expr { $ret = n == null ? e : factory.not(e); }
+ | n=NOT? LPAREN e=condition RPAREN { $ret = n == null ? e : factory.not(e); }
+ ;
+
+boolean_expr returns [ParseNode ret]
+ : l=expression ((EQ r=expression {$ret = factory.equal(l,r); } )
+ | ((NOEQ1 | NOEQ2) r=expression {$ret = factory.notEqual(l,r); } )
+ | (LT r=expression {$ret = factory.lt(l,r); } )
+ | (GT r=expression {$ret = factory.gt(l,r); } )
+ | (LT EQ r=expression {$ret = factory.lte(l,r); } )
+ | (GT EQ r=expression {$ret = factory.gte(l,r); } )
+ | (IS n=NOT? NULL {$ret = factory.isNull(l,n!=null); } )
+ | ( n=NOT? ((LIKE r=expression {$ret = factory.like(l,r,n!=null); } )
+ | (EXISTS LPAREN r=subquery_expression RPAREN {$ret = factory.exists(l,r,n!=null);} )
+ | (BETWEEN r1=expression AND r2=expression {$ret = factory.between(l,r1,r2,n!=null); } )
+ | ((IN ((r=bind_expression {$ret = factory.inList(Arrays.asList(l,r),n!=null);} )
+ | (LPAREN r=subquery_expression RPAREN {$ret = factory.in(l,r,n!=null);} )
+ | (v=list_expressions {List<ParseNode> il = new ArrayList<ParseNode>(v.size() + 1); il.add(l); il.addAll(v); $ret = factory.inList(il,n!=null);})
+ )))
+ ))
+ | { $ret = l; } )
+ ;
+
+bind_expression returns [BindParseNode ret]
+ : b=bind_name { $ret = factory.bind(b); }
+ ;
+
+expression returns [ParseNode ret]
+ : i=expression_add { $ret = i; }
+ ;
+
+expression_add returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=expression_sub {l.add(i);} (PLUS i=expression_sub {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.add(l); }
+ ;
+
+expression_sub returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=expression_concat {l.add(i);} (MINUS i=expression_concat {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.subtract(l); }
+ ;
+
+expression_concat returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=expression_mult {l.add(i);} (CONCAT i=expression_mult {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.concat(l); }
+ ;
+
+expression_mult returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=expression_div {l.add(i);} (ASTERISK i=expression_div {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.multiply(l); }
+ ;
+
+expression_div returns [ParseNode ret]
+@init{List<ParseNode> l = new ArrayList<ParseNode>(4); }
+ : i=expression_negate {l.add(i);} (DIVIDE i=expression_negate {l.add(i);})* { $ret = l.size() == 1 ? l.get(0) : factory.divide(l); }
+ ;
+
+expression_negate returns [ParseNode ret]
+ : m=MINUS? e=expression_term { $ret = m==null ? e : factory.negate(e); }
+ ;
+
+// The lowest level function, which includes literals, binds, but also parenthesized expressions, functions, and case statements.
+expression_term returns [ParseNode ret]
+@init{ParseNode n;boolean isAscending=true;}
+ : field=identifier oj=OUTER_JOIN? {n = factory.column(null,field,field); $ret = oj==null ? n : factory.outer(n); }
+ | tableName=table_name DOT field=identifier oj=OUTER_JOIN? {n = factory.column(tableName, field, field); $ret = oj==null ? n : factory.outer(n); }
+ | field=identifier LPAREN l=expression_list RPAREN wg=(WITHIN GROUP LPAREN ORDER BY l2=expression_terms (ASC {isAscending = true;} | DESC {isAscending = false;}) RPAREN)?
+ {
+ FunctionParseNode f = wg==null ? factory.function(field, l) : factory.function(field,l,l2,isAscending);
+ contextStack.peek().setAggregate(f.isAggregate());
+ $ret = f;
+ }
+ | field=identifier LPAREN t=ASTERISK RPAREN
+ {
+ if (!isCountFunction(field)) {
+ throwRecognitionException(t);
+ }
+ FunctionParseNode f = factory.function(field, LiteralParseNode.STAR);
+ contextStack.peek().setAggregate(f.isAggregate());
+ $ret = f;
+ }
+ | field=identifier LPAREN t=DISTINCT l=expression_list RPAREN
+ {
+ FunctionParseNode f = factory.functionDistinct(field, l);
+ contextStack.peek().setAggregate(f.isAggregate());
+ $ret = f;
+ }
+ | e=literal_or_bind_value oj=OUTER_JOIN? { n = e; $ret = oj==null ? n : factory.outer(n); }
+ | e=case_statement { $ret = e; }
+ | LPAREN l=expression_terms RPAREN
+ {
+ if(l.size() == 1) {
+ $ret = l.get(0);
+ }
+ else {
+ $ret = factory.rowValueConstructor(l);
+ }
+ }
+ | CAST e=expression AS dt=identifier { $ret = factory.cast(e, dt);}
+ ;
+
+expression_terms returns [List<ParseNode> ret]
+@init{ret = new ArrayList<ParseNode>(); }
+ : e = expression {$ret.add(e);} (COMMA e = expression {$ret.add(e);} )*
+;
+
+expression_list returns [List<ParseNode> ret]
+@init{ret = new ArrayList<ParseNode>(); }
+ : (v = expression {$ret.add(v);})? (COMMA v = expression {$ret.add(v);} )*
+;
+
+index_name returns [NamedNode ret]
+ : name=identifier {$ret = factory.indexName(name); }
+ ;
+
+// TODO: figure out how not repeat this two times
+table_name returns [TableName ret]
+ : t=identifier {$ret = factory.table(null, t); }
+ | s=identifier DOT t=identifier {$ret = factory.table(s, t); }
+ ;
+
+// TODO: figure out how not repeat this two times
+from_table_name returns [TableName ret]
+ : t=identifier {$ret = factory.table(null, t); }
+ | s=identifier DOT t=identifier {$ret = factory.table(s, t); }
+ ;
+
+// The lowest level function, which includes literals, binds, but also parenthesized expressions, functions, and case statements.
+literal_or_bind_value returns [ParseNode ret]
+ : e=literal { $ret = e; }
+ | b=bind_name { $ret = factory.bind(b); }
+ ;
+
+// Get a string, integer, double, date, boolean, or NULL value.
+literal returns [LiteralParseNode ret]
+ : t=STRING_LITERAL { ret = factory.literal(t.getText()); }
+ | l=int_literal { ret = l; }
+ | l=long_literal { ret = l; }
+ | l=double_literal { ret = l; }
+ | t=DECIMAL {
+ try {
+ ret = factory.literal(new BigDecimal(t.getText()));
+ } catch (NumberFormatException e) { // Shouldn't happen since we just parsed a decimal
+ throwRecognitionException(t);
+ }
+ }
+ | NULL {ret = factory.literal(null);}
+ | TRUE {ret = factory.literal(Boolean.TRUE);}
+ | FALSE {ret = factory.literal(Boolean.FALSE);}
+ ;
+
+int_literal returns [LiteralParseNode ret]
+ : n=NUMBER {
+ try {
+ Long v = Long.valueOf(n.getText());
+ if (v >= Integer.MIN_VALUE && v <= Integer.MAX_VALUE) {
+ ret = factory.literal(v.intValue());
+ } else {
+ ret = factory.literal(v);
+ }
+ } catch (NumberFormatException e) { // Shouldn't happen since we just parsed a number
+ throwRecognitionException(n);
+ }
+ }
+ ;
+
+long_literal returns [LiteralParseNode ret]
+ : l=LONG {
+ try {
+ String lt = l.getText();
+ Long v = Long.valueOf(lt.substring(0, lt.length() - 1));
+ ret = factory.literal(v);
+ } catch (NumberFormatException e) { // Shouldn't happen since we just parsed a number
+ throwRecognitionException(l);
+ }
+ }
+ ;
+
+double_literal returns [LiteralParseNode ret]
+ : d=DOUBLE {
+ try {
+ String dt = d.getText();
+ Double v = Double.valueOf(dt.substring(0, dt.length() - 1));
+ ret = factory.literal(v);
+ } catch (NumberFormatException e) { // Shouldn't happen since we just parsed a number
+ throwRecognitionException(d);
+ }
+ }
+ ;
+
+list_expressions returns [List<ParseNode> ret]
+@init{ret = new ArrayList<ParseNode>(); }
+ : LPAREN v = expression {$ret.add(v);} (COMMA v = expression {$ret.add(v);} )* RPAREN
+;
+
+// parse a field, if it might be a bind name.
+table returns [String ret]
+ : b=bind_name { $ret = b; }
+ | n=parseNoReserved { $ret = n; }
+ ;
+
+// Bind names are a colon followed by 1+ letter/digits/underscores, or '?' (unclear how Oracle acutally deals with this, but we'll just treat it as a special bind)
+bind_name returns [String ret]
+ : bname=BIND_NAME { String bnameStr = bname.getText().substring(1); updateBind(bnameStr); $ret = bnameStr; }
+ | QUESTION { $ret = nextBind(); } // TODO: only support this?
+ ;
+
+// Parse a field, includes line and column information.
+identifier returns [String ret]
+ : c=parseNoReserved { $ret = c; }
+ ;
+
+parseNoReserved returns [String ret]
+ : n=NAME { $ret = n.getText(); }
+ ;
+
+case_statement returns [ParseNode ret]
+@init{List<ParseNode> w = new ArrayList<ParseNode>(4);}
+ : CASE e1=expression (WHEN e2=expression THEN t=expression {w.add(t);w.add(factory.equal(e1,e2));})+ (ELSE el=expression {w.add(el);})? END {$ret = factory.caseWhen(w);}
+ | CASE (WHEN c=condition THEN t=expression {w.add(t);w.add(c);})+ (ELSE el=expression {w.add(el);})? END {$ret = factory.caseWhen(w);}
+ ;
+
+// --------------------------------------
+// The Lexer
+
+HINT_START: '/*+' ;
+COMMENT_START: '/*';
+COMMENT_AND_HINT_END: '*/' ;
+SL_COMMENT1: '//';
+SL_COMMENT2: '--';
+
+// Bind names start with a colon and followed by 1 or more letter/digit/underscores
+BIND_NAME
+ : COLON (LETTER|DIGIT|'_')+
+ ;
+
+// Valid names can have a single underscore, but not multiple
+// Turn back on literal testing, all names are literals.
+NAME
+ : LETTER (FIELDCHAR)* ('\"' (DBL_QUOTE_CHAR)* '\"')?
+ | '\"' (DBL_QUOTE_CHAR)* '\"'
+ ;
+
+// An integer number, positive or negative
+NUMBER
+ : POSINTEGER
+ ;
+
+LONG
+ : POSINTEGER ('L'|'l')
+ ;
+
+// Exponential format is not supported.
+DECIMAL
+ : POSINTEGER? '.' POSINTEGER
+ ;
+
+DOUBLE
+ : DECIMAL ('D'|'d')
+ ;
+
+DOUBLE_QUOTE
+ : '"'
+ ;
+
+EQ
+ : '='
+ ;
+
+LT
+ : '<'
+ ;
+
+GT
+ : '>'
+ ;
+
+DOUBLE_EQ
+ : '=''='
+ ;
+
+NOEQ1
+ : '!''='
+ ;
+
+NOEQ2
+ : '<''>'
+ ;
+
+CONCAT
+ : '|''|'
+ ;
+
+COMMA
+ : ','
+ ;
+
+LPAREN
+ : '('
+ ;
+
+RPAREN
+ : ')'
+ ;
+
+SEMICOLON
+ : ';'
+ ;
+
+COLON
+ : ':'
+ ;
+
+QUESTION
+ : '?'
+ ;
+
+LSQUARE
+ : '['
+ ;
+
+RSQUARE
+ : ']'
+ ;
+
+LCURLY
+ : '{'
+ ;
+
+RCURLY
+ : '}'
+ ;
+
+AT
+ : '@'
+ ;
+
+TILDE
+ : '~'
+ ;
+
+PLUS
+ : '+'
+ ;
+
+MINUS
+ : '-'
+ ;
+
+ASTERISK
+ : '*'
+ ;
+
+DIVIDE
+ : '/'
+ ;
+
+OUTER_JOIN
+ : '(' '+' ')'
+ ;
+// A FieldCharacter is a letter, digit, underscore, or a certain unicode section.
+fragment
+FIELDCHAR
+ : LETTER
+ | DIGIT
+ | '_'
+ | '\u0080'..'\ufffe'
+ ;
+
+// A Letter is a lower or upper case ascii character.
+fragment
+LETTER
+ : 'a'..'z'
+ | 'A'..'Z'
+ ;
+
+fragment
+POSINTEGER
+ : DIGIT+
+ ;
+
+fragment
+DIGIT
+ : '0'..'9'
+ ;
+
+// string literals
+STRING_LITERAL
+@init{ StringBuilder sb = new StringBuilder(); }
+ : '\''
+ ( t=CHAR { sb.append(t.getText()); }
+ | t=CHAR_ESC { sb.append(getText()); }
+ )* '\'' { setText(sb.toString()); }
+ ;
+
+fragment
+CHAR
+ : ( ~('\'' | '\\') )+
+ ;
+
+fragment
+DBL_QUOTE_CHAR
+ : ( ~('\"') )+
+ ;
+
+// escape sequence inside a string literal
+fragment
+CHAR_ESC
+ : '\\'
+ ( 'n' { setText("\n"); }
+ | 'r' { setText("\r"); }
+ | 't' { setText("\t"); }
+ | 'b' { setText("\b"); }
+ | 'f' { setText("\f"); }
+ | '\"' { setText("\""); }
+ | '\'' { setText("\'"); }
+ | '\\' { setText("\\"); }
+ | '_' { setText("\\_"); }
+ | '%' { setText("\\\%"); }
+ )
+ | '\'\'' { setText("\'"); }
+ ;
+
+// whitespace (skip)
+WS
+ : ( ' ' | '\t' ) { $channel=HIDDEN; }
+ ;
+
+EOL
+ : ('\r' | '\n')
+ { skip(); }
+ ;
+
+// Keep everything in comment in a case sensitive manner
+ML_HINT
+@init{ StringBuilder sb = new StringBuilder(); }
+ : h=HINT_START ( options {greedy=false;} : t=.)* { sb.append($text); } COMMENT_AND_HINT_END
+ { setText(sb.substring(h.getText().length())); } // Get rid of the HINT_START text
+ ;
+
+ML_COMMENT
+ : COMMENT_START (~PLUS) ( options {greedy=false;} : . )* COMMENT_AND_HINT_END
+ { skip(); }
+ ;
+
+SL_COMMENT
+ : (SL_COMMENT1 | SL_COMMENT2) ( options {greedy=false;} : . )* EOL
+ { skip(); }
+ ;
+
+DOT
+ : '.'
+ ;
+
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/config/csv-bulk-load-config.properties
----------------------------------------------------------------------
diff --git a/src/main/config/csv-bulk-load-config.properties b/src/main/config/csv-bulk-load-config.properties
new file mode 100644
index 0000000..2d81808
--- /dev/null
+++ b/src/main/config/csv-bulk-load-config.properties
@@ -0,0 +1,5 @@
+mapreduce.map.output.compress=true
+mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.CompressionCodec
+io.sort.record.percent=0.2
+io.sort.factor=20
+mapred.tasktracker.map.tasks.maximum=10
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java b/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
new file mode 100644
index 0000000..8314fef
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/IndexKeyValueSkipListSet.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Like a {@link KeyValueSkipListSet}, but also exposes useful, atomic methods (e.g.
+ * {@link #putIfAbsent(KeyValue)}).
+ */
+public class IndexKeyValueSkipListSet extends KeyValueSkipListSet {
+
+ // this is annoying that we need to keep this extra pointer around here, but its pretty minimal
+ // and means we don't need to change the HBase code.
+ private ConcurrentSkipListMap<KeyValue, KeyValue> delegate;
+
+ /**
+ * Create a new {@link IndexKeyValueSkipListSet} based on the passed comparator.
+ * @param comparator to use when comparing keyvalues. It is used both to determine sort order as
+ * well as object equality in the map.
+ * @return a map that uses the passed comparator
+ */
+ public static IndexKeyValueSkipListSet create(Comparator<KeyValue> comparator) {
+ ConcurrentSkipListMap<KeyValue, KeyValue> delegate =
+ new ConcurrentSkipListMap<KeyValue, KeyValue>(comparator);
+ IndexKeyValueSkipListSet ret = new IndexKeyValueSkipListSet(delegate);
+ return ret;
+ }
+
+ /**
+ * @param delegate map to which to delegate all calls
+ */
+ public IndexKeyValueSkipListSet(ConcurrentSkipListMap<KeyValue, KeyValue> delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ }
+
+ /**
+ * Add the passed {@link KeyValue} to the set, only if one is not already set. This is equivalent
+ * to
+ * <pre>
+ * if (!set.containsKey(key))
+ * return set.put(key);
+ * else
+ * return map.set(key);
+ * </pre>
+ * except that the action is performed atomically.
+ * @param kv {@link KeyValue} to add
+ * @return the previous value associated with the specified key, or <tt>null</tt> if there was no
+ * previously stored key
+ * @throws ClassCastException if the specified key cannot be compared with the keys currently in
+ * the map
+ * @throws NullPointerException if the specified key is null
+ */
+ public KeyValue putIfAbsent(KeyValue kv) {
+ return this.delegate.putIfAbsent(kv, kv);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
new file mode 100644
index 0000000..bad82c4
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java
@@ -0,0 +1,152 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.io.Writable;
+
+
+
+/**
+ * A WALReader that can also deserialize custom {@link WALEdit}s that contain index information.
+ * <p>
+ * This is basically a wrapper around a {@link SequenceFileLogReader} that has a custom
+ * {@link SequenceFileLogReader.WALReader#next(Object)} method that only replaces the creation of the WALEdit with our own custom
+ * type
+ * <p>
+ * This is a little bit of a painful way of going about this, but saves the effort of hacking the
+ * HBase source (and deal with getting it reviewed and backported, etc.) and still works.
+ */
+/*
+ * TODO: Support splitting index updates into their own WAL entries on recovery (basically, just
+ * queue them up in next), if we know that the region was on the server when it crashed. However,
+ * this is kind of difficult as we need to know a lot of things the state of the system - basically,
+ * we need to track which of the regions were on the server when it crashed only only split those
+ * edits out into their respective regions.
+ */
+public class IndexedHLogReader implements Reader {
+ private static final Log LOG = LogFactory.getLog(IndexedHLogReader.class);
+
+ private SequenceFileLogReader delegate;
+
+
+ private static class IndexedWALReader extends SequenceFileLogReader.WALReader {
+
+ /**
+ * @param fs
+ * @param p
+ * @param c
+ * @throws IOException
+ */
+ IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException {
+ super(fs, p, c);
+ }
+
+ /**
+ * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without
+ * the check on the value class, since we have a special value class that doesn't directly match
+ * what was specified in the file header
+ */
+ @Override
+ public synchronized boolean next(Writable key, Writable val) throws IOException {
+ boolean more = next(key);
+
+ if (more) {
+ getCurrentValue(val);
+ }
+
+ return more;
+ }
+
+ }
+
+ public IndexedHLogReader() {
+ this.delegate = new SequenceFileLogReader();
+ }
+
+ @Override
+ public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException {
+ this.delegate.init(fs, path, conf);
+ // close the old reader and replace with our own, custom one
+ this.delegate.reader.close();
+ this.delegate.reader = new IndexedWALReader(fs, path, conf);
+ Exception e = new Exception();
+ LOG.info("Instantiated indexed log reader." + Arrays.toString(e.getStackTrace()));
+ LOG.info("Got conf: " + conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.delegate.close();
+ }
+
+ @Override
+ public Entry next() throws IOException {
+ return next(null);
+ }
+
+ @Override
+ public Entry next(Entry reuse) throws IOException {
+ delegate.entryStart = delegate.reader.getPosition();
+ HLog.Entry e = reuse;
+ if (e == null) {
+ HLogKey key;
+ if (delegate.keyClass == null) {
+ key = HLog.newKey(delegate.conf);
+ } else {
+ try {
+ key = delegate.keyClass.newInstance();
+ } catch (InstantiationException ie) {
+ throw new IOException(ie);
+ } catch (IllegalAccessException iae) {
+ throw new IOException(iae);
+ }
+ }
+ WALEdit val = new WALEdit();
+ e = new HLog.Entry(key, val);
+ }
+
+ // now read in the HLog.Entry from the WAL
+ boolean nextPairValid = false;
+ try {
+ if (delegate.compressionContext != null) {
+ throw new UnsupportedOperationException(
+ "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits "
+ + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!");
+ }
+ // this is the special bit - we use our custom entry to read in the key-values that have index
+ // information, but otherwise it looks just like a regular WALEdit
+ IndexedWALEdit edit = new IndexedWALEdit(e.getEdit());
+ nextPairValid = delegate.reader.next(e.getKey(), edit);
+ } catch (IOException ioe) {
+ throw delegate.addFileInfoToException(ioe);
+ }
+ delegate.edit++;
+ if (delegate.compressionContext != null && delegate.emptyCompressionContext) {
+ delegate.emptyCompressionContext = false;
+ }
+ return nextPairValid ? e : null;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ this.delegate.seek(pos);
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return this.delegate.getPosition();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ this.delegate.reset();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
new file mode 100644
index 0000000..66f7ffc
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
@@ -0,0 +1,91 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.wal.KeyValueCodec;
+
+/**
+ * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader
+ * <p>
+ * This class should only be used with HBase < 0.94.9. Newer installations of HBase should
+ * instead use the IndexedWALEditCodec along with the correct configuration options.
+ */
+public class IndexedWALEdit extends WALEdit {
+ //reproduced here so we don't need to modify the HBase source.
+ private static final int VERSION_2 = -1;
+ private WALEdit delegate;
+
+ /**
+ * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced.
+ * @param delegate to copy
+ */
+ @SuppressWarnings("deprecation")
+ public IndexedWALEdit(WALEdit delegate) {
+ this.delegate = delegate;
+ // reset the delegate's fields
+ this.delegate.getKeyValues().clear();
+ if (this.delegate.getScopes() != null) {
+ this.delegate.getScopes().clear();
+ }
+ }
+
+ public IndexedWALEdit() {
+
+ }
+
+ @Override
+public void setCompressionContext(CompressionContext context) {
+ throw new UnsupportedOperationException(
+ "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead.");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ delegate.getKeyValues().clear();
+ if (delegate.getScopes() != null) {
+ delegate.getScopes().clear();
+ }
+ // ----------------------------------------------------------------------------------------
+ // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to
+ // capture the index updates
+ // -----------------------------------------------------------------------------------------
+ int versionOrLength = in.readInt();
+ if (versionOrLength != VERSION_2) {
+ throw new IOException("You must update your cluster to the lastest version of HBase and"
+ + " clean out all logs (cleanly start and then shutdown) before enabling indexing!");
+ }
+ // this is new style HLog entry containing multiple KeyValues.
+ List<KeyValue> kvs = KeyValueCodec.readKeyValues(in);
+ delegate.getKeyValues().addAll(kvs);
+
+ // then read in the rest of the WALEdit
+ int numFamilies = in.readInt();
+ NavigableMap<byte[], Integer> scopes = delegate.getScopes();
+ if (numFamilies > 0) {
+ if (scopes == null) {
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ }
+ for (int i = 0; i < numFamilies; i++) {
+ byte[] fam = Bytes.readByteArray(in);
+ int scope = in.readInt();
+ scopes.put(fam, scope);
+ }
+ delegate.setScopes(scopes);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new IOException(
+ "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
new file mode 100644
index 0000000..91c03bc
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -0,0 +1,195 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.BaseDecoder;
+import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.codec.Decoder;
+import org.apache.hadoop.hbase.codec.Encoder;
+
+import org.apache.hbase.index.wal.IndexedKeyValue;
+import org.apache.hbase.index.wal.KeyValueCodec;
+
+
+/**
+ * Support custom indexing {@link KeyValue}s when written to the WAL.
+ * <p>
+ * Currently, we don't support reading older WAL files - only new WAL files. Therefore, this should
+ * not be installed on a running cluster, but rather one that has been cleanly shutdown and requires
+ * no WAL replay on startup.
+ */
+public class IndexedWALEditCodec extends WALEditCodec {
+
+ // can't have negative values because reading off a stream returns a negative if its the end of
+ // the stream
+ private static final int REGULAR_KEY_VALUE_MARKER = 0;
+ private CompressionContext compression;
+
+ /** Required nullary constructor */
+ public IndexedWALEditCodec() {
+ }
+
+ /**
+ * Override the parent implementation so we can get access to the current context too
+ * @param compression compression to support for the encoder/decoder
+ */
+ @Override
+ public void setCompression(CompressionContext compression) {
+ super.setCompression(compression);
+ this.compression = compression;
+ }
+
+ @Override
+ public Decoder getDecoder(InputStream is) {
+ // compression isn't enabled
+ if (this.compression == null) {
+ return new IndexKeyValueDecoder(is);
+ }
+
+ // there is compression, so we get the standard decoder to handle reading those kvs
+ Decoder decoder = super.getDecoder(is);
+ // compression is on, reqturn our custom decoder
+ return new CompressedIndexKeyValueDecoder(is, decoder);
+ }
+
+ @Override
+ public Encoder getEncoder(OutputStream os) {
+ // compression isn't on, do the default thing
+ if (this.compression == null) {
+ return new IndexKeyValueEncoder(os);
+ }
+
+ // compression is on, return our one that will handle putting in the correct markers
+ Encoder encoder = super.getEncoder(os);
+ return new CompressedIndexKeyValueEncoder(os, encoder);
+ }
+
+ /**
+ * Custom {@link Decoder} that can handle a stream of regular and indexed {@link KeyValue}s.
+ */
+ public class IndexKeyValueDecoder extends BaseDecoder {
+
+ /**
+ * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse
+ * generic {@link KeyValue}s.
+ * @param is stream to read from
+ */
+ public IndexKeyValueDecoder(InputStream is){
+ super(is);
+ }
+
+ @Override
+ protected KeyValue parseCell() throws IOException{
+ return KeyValueCodec.readKeyValue((DataInput) this.in);
+ }
+ }
+
+ public class CompressedIndexKeyValueDecoder extends BaseDecoder {
+
+ private Decoder decoder;
+
+ /**
+ * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse
+ * generic {@link KeyValue}s.
+ * @param is stream to read from
+ * @param compressedDecoder decoder for generic {@link KeyValue}s. Should support the expected
+ * compression.
+ */
+ public CompressedIndexKeyValueDecoder(InputStream is, Decoder compressedDecoder) {
+ super(is);
+ this.decoder = compressedDecoder;
+ }
+
+ @Override
+ protected KeyValue parseCell() throws IOException {
+ // reader the marker
+ int marker = this.in.read();
+ if (marker < 0) {
+ throw new EOFException(
+ "Unexepcted end of stream found while reading next (Indexed) KeyValue");
+ }
+
+ // do the normal thing, if its a regular kv
+ if (marker == REGULAR_KEY_VALUE_MARKER) {
+ if (!this.decoder.advance()) {
+ throw new IOException("Could not read next key-value from generic KeyValue Decoder!");
+ }
+ return this.decoder.current();
+ }
+
+ // its an indexedKeyValue, so parse it out specially
+ return KeyValueCodec.readKeyValue((DataInput) this.in);
+ }
+ }
+
+ /**
+ * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support
+ * compression.
+ */
+ private static class IndexKeyValueEncoder extends BaseEncoder {
+ public IndexKeyValueEncoder(OutputStream os) {
+ super(os);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ }
+
+ @Override
+ public void write(KeyValue cell) throws IOException {
+ // make sure we are open
+ checkFlushed();
+
+ // use the standard encoding mechanism
+ KeyValueCodec.write((DataOutput) this.out, cell);
+ }
+ }
+
+ /**
+ * Write {@link IndexedKeyValue}s along side compressed {@link KeyValue}s. This Encoder is
+ * <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed
+ * and uncompressed WALs that contain index entries.
+ */
+ private static class CompressedIndexKeyValueEncoder extends BaseEncoder {
+ private Encoder compressedKvEncoder;
+
+ public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) {
+ super(os);
+ this.compressedKvEncoder = compressedKvEncoder;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.compressedKvEncoder.flush();
+ super.flush();
+ }
+
+ @Override
+ public void write(KeyValue cell) throws IOException {
+ //make sure we are open
+ checkFlushed();
+
+ //write the special marker so we can figure out which kind of kv is it
+ int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
+ if (cell instanceof IndexedKeyValue) {
+ marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
+ }
+ out.write(marker);
+
+ //then serialize based on the marker
+ if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
+ this.compressedKvEncoder.write(cell);
+ }
+ else{
+ KeyValueCodec.write((DataOutput) out, cell);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/CapturingAbortable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/CapturingAbortable.java b/src/main/java/org/apache/hbase/index/CapturingAbortable.java
new file mode 100644
index 0000000..f46ec3c
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/CapturingAbortable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index;
+
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * {@link Abortable} that can rethrow the cause of the abort.
+ */
+public class CapturingAbortable implements Abortable {
+
+ private Abortable delegate;
+ private Throwable cause;
+ private String why;
+
+ public CapturingAbortable(Abortable delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ if (delegate.isAborted()) {
+ return;
+ }
+ this.why = why;
+ this.cause = e;
+ delegate.abort(why, e);
+
+ }
+
+ @Override
+ public boolean isAborted() {
+ return delegate.isAborted();
+ }
+
+ /**
+ * Throw the cause of the abort, if <tt>this</tt> was aborted. If there was an exception causing
+ * the abort, re-throws that. Otherwise, just throws a generic {@link Exception} with the reason
+ * why the abort was caused.
+ * @throws Throwable the cause of the abort.
+ */
+ public void throwCauseIfAborted() throws Throwable {
+ if (!this.isAborted()) {
+ return;
+ }
+ if (cause == null) {
+ throw new Exception(why);
+ }
+ throw cause;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/IndexLogRollSynchronizer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/IndexLogRollSynchronizer.java b/src/main/java/org/apache/hbase/index/IndexLogRollSynchronizer.java
new file mode 100644
index 0000000..a3d1e22
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/IndexLogRollSynchronizer.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Ensure that the log isn't rolled while we are the in middle of doing a pending index write.
+ * <p>
+ * The problem we are trying to solve is the following sequence:
+ * <ol>
+ * <li>Write to the indexed table</li>
+ * <li>Write the index-containing WALEdit</li>
+ * <li>Start writing to the index tables in the postXXX hook</li>
+ * <li>WAL gets rolled and archived</li>
+ * <li>An index update fails, in which case we should kill ourselves to get WAL replay</li>
+ * <li>Since the WAL got archived, we won't get the replay of the index writes</li>
+ * </ol>
+ * <p>
+ * The usual course of events should be:
+ * <ol>
+ * <li>In a preXXX hook,
+ * <ol>
+ * <li>Build the {@link WALEdit} + index information</li>
+ * <li>Lock the {@link IndexLogRollSynchronizer#logArchiveLock}</li>
+ * <ul>
+ * <li>This is a reentrant readlock on the WAL archiving, so we can make multiple WAL/index updates
+ * concurrently</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * </li>
+ * <li>Pass that {@link WALEdit} to the WAL, ensuring its durable and replayable</li>
+ * <li>In the corresponding postXXX,
+ * <ol>
+ * <li>make the updates to the index tables</li>
+ * <li>Unlock {@link IndexLogRollSynchronizer#logArchiveLock}</li>
+ * </ol>
+ * </li> </ol>
+ * <p>
+ * <tt>this</tt> should be added as a {@link WALActionsListener} by updating
+ */
+public class IndexLogRollSynchronizer implements WALActionsListener {
+
+ private static final Log LOG = LogFactory.getLog(IndexLogRollSynchronizer.class);
+ private WriteLock logArchiveLock;
+
+ public IndexLogRollSynchronizer(WriteLock logWriteLock){
+ this.logArchiveLock = logWriteLock;
+ }
+
+
+ @Override
+ public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+ //take a write lock on the index - any pending index updates will complete before we finish
+ LOG.debug("Taking INDEX_UPDATE writelock");
+ logArchiveLock.lock();
+ LOG.debug("Got the INDEX_UPDATE writelock");
+ }
+
+ @Override
+ public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+ // done archiving the logs, any WAL updates will be replayed on failure
+ LOG.debug("Releasing INDEX_UPDATE writelock");
+ logArchiveLock.unlock();
+ }
+
+ @Override
+ public void logCloseRequested() {
+ // don't care- before this is called, all the HRegions are closed, so we can't get any new
+ // requests and all pending request can finish before the WAL closes.
+ }
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ // noop
+ }
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+ // noop
+ }
+
+ @Override
+ public void logRollRequested() {
+ // noop
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
+ // noop
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
+ // noop
+ }
+}
\ No newline at end of file