You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by si...@apache.org on 2015/02/05 06:26:13 UTC

tajo git commit: TAJO-919: Implement LAG and LEAD window functions. (Keuntae Park)

Repository: tajo
Updated Branches:
  refs/heads/master 5e024f947 -> 901700b26


TAJO-919: Implement LAG and LEAD window functions. (Keuntae Park)

Closes #351


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/901700b2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/901700b2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/901700b2

Branch: refs/heads/master
Commit: 901700b26f6862738adf5bfc8cbe718cd6a19627
Parents: 5e024f9
Author: Keuntae Park <si...@apache.org>
Authored: Thu Feb 5 14:23:41 2015 +0900
Committer: Keuntae Park <si...@apache.org>
Committed: Thu Feb 5 14:23:41 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/engine/parser/SQLLexer.g4   |  2 +
 .../org/apache/tajo/engine/parser/SQLParser.g4  |  2 +
 .../tajo/engine/function/builtin/Lead.java      | 92 ++++++++++++++++++
 .../tajo/engine/function/builtin/LeadDate.java  | 49 ++++++++++
 .../engine/function/builtin/LeadDouble.java     | 49 ++++++++++
 .../tajo/engine/function/builtin/LeadFloat.java | 49 ++++++++++
 .../tajo/engine/function/builtin/LeadInt.java   | 49 ++++++++++
 .../tajo/engine/function/builtin/LeadLong.java  | 49 ++++++++++
 .../engine/function/builtin/LeadString.java     | 49 ++++++++++
 .../tajo/engine/function/builtin/LeadTime.java  | 49 ++++++++++
 .../engine/function/builtin/LeadTimestamp.java  | 49 ++++++++++
 .../apache/tajo/engine/function/window/Lag.java | 82 ++++++++++++++++
 .../tajo/engine/function/window/LagDate.java    | 42 +++++++++
 .../tajo/engine/function/window/LagDouble.java  | 42 +++++++++
 .../tajo/engine/function/window/LagFloat.java   | 42 +++++++++
 .../tajo/engine/function/window/LagInt.java     | 42 +++++++++
 .../tajo/engine/function/window/LagLong.java    | 42 +++++++++
 .../tajo/engine/function/window/LagString.java  | 42 +++++++++
 .../tajo/engine/function/window/LagTime.java    | 42 +++++++++
 .../engine/function/window/LagTimestamp.java    | 42 +++++++++
 .../apache/tajo/engine/parser/SQLAnalyzer.java  | 26 ++++++
 .../engine/planner/physical/WindowAggExec.java  |  2 +-
 .../tajo/engine/query/TestWindowQuery.java      | 98 ++++++++++++++++++++
 .../queries/TestWindowQuery/testLag1.sql        | 18 ++++
 .../TestWindowQuery/testLagWithDefault.sql      | 18 ++++
 .../TestWindowQuery/testLagWithNoArgs.sql       | 18 ++++
 .../queries/TestWindowQuery/testLead1.sql       | 18 ++++
 .../TestWindowQuery/testLeadWithDefault.sql     | 18 ++++
 .../TestWindowQuery/testLeadWithNoArgs.sql      | 18 ++++
 .../results/TestWindowQuery/testLag1.result     |  7 ++
 .../TestWindowQuery/testLagWithDefault.result   |  7 ++
 .../TestWindowQuery/testLagWithNoArgs.result    |  7 ++
 .../results/TestWindowQuery/testLead1.result    |  7 ++
 .../TestWindowQuery/testLeadWithDefault.result  |  7 ++
 .../TestWindowQuery/testLeadWithNoArgs.result   |  7 ++
 .../org/apache/tajo/plan/ExprAnnotator.java     |  6 +-
 .../org/apache/tajo/plan/TypeDeterminant.java   |  4 +
 38 files changed, 1192 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e9a9332..2e4849f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,6 +5,8 @@ Release 0.10.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-919: Implement LAG and LEAD window functions. (Keuntae Park)
+
     TAJO-920: Add FIRST_VALUE and LAST_VALUE window functions. 
     (Keuntae Park via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
index 48a6fbe..0c144f7 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
@@ -249,8 +249,10 @@ INTERSECTION : I N T E R S E C T I O N;
 ISODOW : I S O D O W;
 ISOYEAR : I S O Y E A R;
 
+LAG : L A G;
 LAST : L A S T;
 LAST_VALUE : L A S T UNDERLINE V A L U E;
+LEAD : L E A D;
 LESS : L E S S;
 LIST : L I S T;
 LOCATION : L O C A T I O N;

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 0bc89db..ddf679b 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -631,6 +631,8 @@ window_function_type
   | aggregate_function
   | FIRST_VALUE LEFT_PAREN column_reference RIGHT_PAREN
   | LAST_VALUE LEFT_PAREN column_reference RIGHT_PAREN
+  | LAG LEFT_PAREN column_reference ( COMMA numeric_value_expression ( COMMA common_value_expression )? )? RIGHT_PAREN
+  | LEAD LEFT_PAREN column_reference ( COMMA numeric_value_expression ( COMMA common_value_expression )? )? RIGHT_PAREN
   ;
 
 rank_function_type

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Lead.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Lead.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Lead.java
new file mode 100644
index 0000000..dc135db
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Lead.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.plan.function.AggFunction;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.LinkedList;
+
+public abstract class Lead extends AggFunction<Datum> {
+
+  public Lead(Column[] columns) {
+    super(columns);
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new LeadContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+    LeadContext leadCtx = (LeadContext)ctx;
+    if (leadCtx.leadNum < 0) {
+      if (params.size() == 1) {
+        leadCtx.leadNum = 1;
+      } else {
+        leadCtx.leadNum = params.get(1).asInt4();
+      }
+    }
+
+    if (leadCtx.leadNum > 0) {
+      leadCtx.leadNum --;
+    } else {
+      leadCtx.leadBuffer.add(params.get(0));
+    }
+
+    if (leadCtx.defaultDatum == null) {
+      if (params.size() == 3) {
+        leadCtx.defaultDatum = params.get(2);
+      } else {
+        leadCtx.defaultDatum = NullDatum.get();
+      }
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    LeadContext leadCtx = (LeadContext)ctx;
+    if (leadCtx.leadBuffer.isEmpty()) {
+      return leadCtx.defaultDatum;
+    } else {
+      return leadCtx.leadBuffer.removeFirst();
+    }
+  }
+
+  @Override
+  public Datum terminate(FunctionContext ctx) {
+    LeadContext leadCtx = (LeadContext)ctx;
+    if (leadCtx.leadBuffer.isEmpty()) {
+      return leadCtx.defaultDatum;
+    } else {
+      return leadCtx.leadBuffer.removeFirst();
+    }
+  }
+
+  private class LeadContext implements FunctionContext {
+    LinkedList<Datum> leadBuffer = new LinkedList<Datum>();
+    int leadNum = -1;
+    Datum defaultDatum = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDate.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDate.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDate.java
new file mode 100644
index 0000000..17d2c3a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDate.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.DATE,
+    paramTypes = {@ParamTypes(paramTypes = {Type.DATE}), @ParamTypes(paramTypes = {Type.DATE, Type.INT4}), @ParamTypes(paramTypes = {Type.DATE, Type.INT4, Type.DATE})}
+)
+public class LeadDate extends Lead {
+
+  public LeadDate() {
+    super(new Column[] {
+        new Column("col", Type.DATE),
+        new Column("num", Type.INT4),
+        new Column("default", Type.DATE)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.DATE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDouble.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDouble.java
new file mode 100644
index 0000000..90c0556
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadDouble.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.FLOAT8,
+    paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT8}), @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4}), @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4, Type.FLOAT8})}
+)
+public class LeadDouble extends Lead {
+
+  public LeadDouble() {
+    super(new Column[] {
+        new Column("col", Type.FLOAT8),
+        new Column("num", Type.INT4),
+        new Column("default", Type.FLOAT8)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadFloat.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadFloat.java
new file mode 100644
index 0000000..cdbf6bc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadFloat.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.FLOAT4,
+    paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT4}), @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4}), @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4, Type.FLOAT4})}
+)
+public class LeadFloat extends Lead {
+
+  public LeadFloat() {
+    super(new Column[] {
+        new Column("col", Type.FLOAT4),
+        new Column("num", Type.INT4),
+        new Column("default", Type.FLOAT4)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadInt.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadInt.java
new file mode 100644
index 0000000..b9d5dd7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadInt.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.INT4,
+    paramTypes = {@ParamTypes(paramTypes = {Type.INT4}), @ParamTypes(paramTypes = {Type.INT4, Type.INT4}), @ParamTypes(paramTypes = {Type.INT4, Type.INT4, Type.INT4})}
+)
+public class LeadInt extends Lead {
+
+  public LeadInt() {
+    super(new Column[] {
+        new Column("col", Type.INT4),
+        new Column("num", Type.INT4),
+        new Column("default", Type.INT4)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadLong.java
new file mode 100644
index 0000000..46dcd38
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadLong.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.INT8,
+    paramTypes = {@ParamTypes(paramTypes = {Type.INT8}), @ParamTypes(paramTypes = {Type.INT8, Type.INT4}), @ParamTypes(paramTypes = {Type.INT8, Type.INT4, Type.INT8})}
+)
+public class LeadLong extends Lead {
+
+  public LeadLong() {
+    super(new Column[] {
+        new Column("col", Type.INT8),
+        new Column("num", Type.INT4),
+        new Column("default", Type.INT8)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadString.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadString.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadString.java
new file mode 100644
index 0000000..b919775
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadString.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.TEXT,
+    paramTypes = {@ParamTypes(paramTypes = {Type.TEXT}), @ParamTypes(paramTypes = {Type.TEXT, Type.INT4}), @ParamTypes(paramTypes = {Type.TEXT, Type.INT4, Type.TEXT})}
+)
+public class LeadString extends Lead {
+
+  public LeadString() {
+    super(new Column[] {
+        new Column("col", Type.TEXT),
+        new Column("num", Type.INT4),
+        new Column("default", Type.TEXT)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.TEXT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTime.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTime.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTime.java
new file mode 100644
index 0000000..062eaff
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTime.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.TIME,
+    paramTypes = {@ParamTypes(paramTypes = {Type.TIME}), @ParamTypes(paramTypes = {Type.TIME, Type.INT4}), @ParamTypes(paramTypes = {Type.TIME, Type.INT4, Type.TIME})}
+)
+public class LeadTime extends Lead {
+
+  public LeadTime() {
+    super(new Column[] {
+        new Column("col", Type.TIME),
+        new Column("num", Type.INT4),
+        new Column("default", Type.TIME)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.TIME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTimestamp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTimestamp.java
new file mode 100644
index 0000000..f5f3663
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/LeadTimestamp.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lead",
+    description = "the nth next row value from current row",
+    example = "> SELECT lead(column, n) OVER ();",
+    returnType = Type.TIMESTAMP,
+    paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP}), @ParamTypes(paramTypes = {Type.TIMESTAMP, Type.INT4}), @ParamTypes(paramTypes = {Type.TIMESTAMP, Type.INT4, Type.TIMESTAMP})}
+)
+public class LeadTimestamp extends Lead {
+
+  public LeadTimestamp() {
+    super(new Column[] {
+        new Column("col", Type.TIMESTAMP),
+        new Column("num", Type.INT4),
+        new Column("default", Type.TIMESTAMP)
+    });
+  }
+
+  @Override
+  public TajoDataTypes.DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.TIMESTAMP);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
new file mode 100644
index 0000000..286cf03
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/Lag.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.plan.function.WindowAggFunc;
+import org.apache.tajo.storage.Tuple;
+
+public abstract class Lag extends WindowAggFunc<Datum> {
+
+  public Lag(Column[] columns) {
+    super(columns);
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new LagContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+    LagContext lagCtx = (LagContext)ctx;
+    if(lagCtx.lagBuffer == null) {
+      int lagNum = 0;
+      if (params.size() == 1) {
+        lagNum = 1;
+      } else {
+        lagNum = params.get(1).asInt4();
+      }
+      lagCtx.lagBuffer = new CircularFifoBuffer(lagNum+1);
+    }
+
+    if (params.get(0).isNotNull()) {
+      lagCtx.lagBuffer.add(params.get(0));
+    } else {
+      lagCtx.lagBuffer.add(NullDatum.get());
+    }
+
+    if (lagCtx.defaultDatum == null) {
+     if (params.size() == 3) {
+       lagCtx.defaultDatum = params.get(2);
+     } else {
+       lagCtx.defaultDatum = NullDatum.get();
+     }
+    }
+  }
+
+  @Override
+  public Datum terminate(FunctionContext ctx) {
+    LagContext lagCtx = (LagContext)ctx;
+    if(lagCtx.lagBuffer.isFull()) {
+      return (Datum)(lagCtx.lagBuffer.get());
+    } else {
+      return lagCtx.defaultDatum;
+    }
+  }
+
+  protected class LagContext implements FunctionContext {
+    CircularFifoBuffer lagBuffer = null;
+    Datum defaultDatum = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDate.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDate.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDate.java
new file mode 100644
index 0000000..b0d0101
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDate.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.DATE,
+    paramTypes = {@ParamTypes(paramTypes = {Type.DATE}), @ParamTypes(paramTypes = {Type.DATE, Type.INT4}), @ParamTypes(paramTypes = {Type.DATE, Type.INT4, Type.DATE})}
+)
+public class LagDate extends Lag {
+
+  public LagDate() {
+    super(new Column[] {
+        new Column("col", Type.DATE),
+        new Column("num", Type.INT4),
+        new Column("default", Type.DATE)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDouble.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDouble.java
new file mode 100644
index 0000000..f5c1e25
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagDouble.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.FLOAT8,
+    paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT8}), @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4}), @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4, Type.FLOAT8})}
+)
+public class LagDouble extends Lag {
+
+  public LagDouble() {
+    super(new Column[] {
+        new Column("col", Type.FLOAT8),
+        new Column("num", Type.INT4),
+        new Column("default", Type.FLOAT8)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagFloat.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagFloat.java
new file mode 100644
index 0000000..0cae9af
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagFloat.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.FLOAT4,
+    paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT4}), @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4}), @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4, Type.FLOAT4})}
+)
+public class LagFloat extends Lag {
+
+  public LagFloat() {
+    super(new Column[] {
+        new Column("col", Type.FLOAT4),
+        new Column("num", Type.INT4),
+        new Column("default", Type.FLOAT4)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagInt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagInt.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagInt.java
new file mode 100644
index 0000000..3319284
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagInt.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.INT4,
+    paramTypes = {@ParamTypes(paramTypes = {Type.INT4}), @ParamTypes(paramTypes = {Type.INT4, Type.INT4}), @ParamTypes(paramTypes = {Type.INT4, Type.INT4, Type.INT4})}
+)
+public class LagInt extends Lag {
+
+  public LagInt() {
+    super(new Column[] {
+        new Column("col", Type.INT4),
+        new Column("num", Type.INT4),
+        new Column("default", Type.INT4)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagLong.java
new file mode 100644
index 0000000..b3bb87a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagLong.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.INT8,
+    paramTypes = {@ParamTypes(paramTypes = {Type.INT8}), @ParamTypes(paramTypes = {Type.INT8, Type.INT4}), @ParamTypes(paramTypes = {Type.INT8, Type.INT4, Type.INT8})}
+)
+public class LagLong extends Lag {
+
+  public LagLong() {
+    super(new Column[] {
+        new Column("col", Type.INT8),
+        new Column("num", Type.INT4),
+        new Column("default", Type.INT8)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagString.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagString.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagString.java
new file mode 100644
index 0000000..777e6aa
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagString.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.TEXT,
+    paramTypes = {@ParamTypes(paramTypes = {Type.TEXT}), @ParamTypes(paramTypes = {Type.TEXT, Type.INT4}), @ParamTypes(paramTypes = {Type.TEXT, Type.INT4, Type.TEXT})}
+)
+public class LagString extends Lag {
+
+  public LagString() {
+    super(new Column[] {
+        new Column("col", Type.TEXT),
+        new Column("num", Type.INT4),
+        new Column("default", Type.TEXT)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTime.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTime.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTime.java
new file mode 100644
index 0000000..e29a54a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTime.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.TIME,
+    paramTypes = {@ParamTypes(paramTypes = {Type.TIME}), @ParamTypes(paramTypes = {Type.TIME, Type.INT4}), @ParamTypes(paramTypes = {Type.TIME, Type.INT4, Type.TIME})}
+)
+public class LagTime extends Lag {
+
+  public LagTime() {
+    super(new Column[] {
+        new Column("col", Type.TIME),
+        new Column("num", Type.INT4),
+        new Column("default", Type.TIME)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTimestamp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTimestamp.java
new file mode 100644
index 0000000..fea5494
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/window/LagTimestamp.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.window;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+
+@Description(
+    functionName = "lag",
+    description = "the nth previous row value of current row",
+    example = "> SELECT lag(column, n) OVER ();",
+    returnType = Type.TIMESTAMP,
+    paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP}), @ParamTypes(paramTypes = {Type.TIMESTAMP, Type.INT4}), @ParamTypes(paramTypes = {Type.TIMESTAMP, Type.INT4, Type.TIMESTAMP})}
+)
+public class LagTimestamp extends Lag {
+
+  public LagTimestamp() {
+    super(new Column[] {
+        new Column("col", Type.TIMESTAMP),
+        new Column("num", Type.INT4),
+        new Column("default", Type.TIMESTAMP)
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 3669625..9ac1938 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -383,6 +383,32 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       functionBody = new GeneralSetFunctionExpr("first_value", false, new Expr[]{ visitColumn_reference(functionType.column_reference())});
     } else if (checkIfExist(functionType.LAST_VALUE())) {
       functionBody = new GeneralSetFunctionExpr("last_value", false, new Expr[]{visitColumn_reference(functionType.column_reference())});
+    } else if (checkIfExist(functionType.LAG())) {
+      if (checkIfExist(functionType.numeric_value_expression())) {
+        if (checkIfExist(functionType.common_value_expression())) {
+          functionBody = new GeneralSetFunctionExpr("lag", false, new Expr[]{visitColumn_reference(functionType.column_reference()),
+              visitNumeric_value_expression(functionType.numeric_value_expression()),
+              visitCommon_value_expression(functionType.common_value_expression())});
+        } else {
+          functionBody = new GeneralSetFunctionExpr("lag", false, new Expr[]{visitColumn_reference(functionType.column_reference()),
+              visitNumeric_value_expression(functionType.numeric_value_expression())});
+        }
+      } else {
+        functionBody = new GeneralSetFunctionExpr("lag", false, new Expr[]{visitColumn_reference(functionType.column_reference())});
+      }
+    } else if (checkIfExist(functionType.LEAD())) {
+      if (checkIfExist(functionType.numeric_value_expression())) {
+        if (checkIfExist(functionType.common_value_expression())) {
+          functionBody = new GeneralSetFunctionExpr("lead", false, new Expr[]{visitColumn_reference(functionType.column_reference()),
+              visitNumeric_value_expression(functionType.numeric_value_expression()),
+              visitCommon_value_expression(functionType.common_value_expression())});
+        } else {
+          functionBody = new GeneralSetFunctionExpr("lead", false, new Expr[]{visitColumn_reference(functionType.column_reference()),
+              visitNumeric_value_expression(functionType.numeric_value_expression())});
+        }
+      } else {
+        functionBody = new GeneralSetFunctionExpr("lead", false, new Expr[]{visitColumn_reference(functionType.column_reference())});
+      }
     } else {
       functionBody = visitAggregate_function(functionType.aggregate_function());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index a36bd4f..1a7dff9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -314,8 +314,8 @@ public class WindowAggExec extends UnaryPhysicalExec {
       }
 
       if (aggFuncFlags[idx]) {
-        Datum result = functions[idx].terminate(contexts[idx]);
         for (int i = 0; i < evaluatedTuples.size(); i++) {
+          Datum result = functions[idx].terminate(contexts[idx]);
           Tuple outTuple = evaluatedTuples.get(i);
           outTuple.put(nonFunctionColumnNum + idx, result);
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
index 14ab58f..9385db0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
@@ -323,6 +323,104 @@ public class TestWindowQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testLag1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLagTime() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    String[] data = new String[]{ "1|12:11:12", "2|10:11:13", "2|05:42:41" };
+    TajoTestingCluster.createTable("lagtime", schema, tableOptions, data, 1);
+
+    try {
+      ResultSet res = executeString(
+          "select id, lag(time, 1) over ( partition by id order by time ) as time_lag from lagtime");
+      String ascExpected = "id,time_lag\n" +
+          "-------------------------------\n" +
+          "1,null\n" +
+          "2,null\n" +
+          "2,05:42:41\n";
+
+      assertEquals(ascExpected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE lagtime PURGE");
+    }
+  }
+
+  @Test
+  public final void testLagWithNoArgs() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLagWithDefault() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLead1() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeadTime() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    String[] data = new String[]{ "1|12:11:12", "2|10:11:13", "2|05:42:41" };
+    TajoTestingCluster.createTable("leadtime", schema, tableOptions, data, 1);
+
+    try {
+      ResultSet res = executeString(
+          "select id, lead(time, 1) over ( partition by id order by time ) as time_lead from leadtime");
+      String ascExpected = "id,time_lead\n" +
+          "-------------------------------\n" +
+          "1,null\n" +
+          "2,10:11:13\n" +
+          "2,null\n";
+
+      assertEquals(ascExpected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE leadtime PURGE");
+    }
+  }
+
+  @Test
+  public final void testLeadWithNoArgs() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testLeadWithDefault() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
   public final void testMultipleWindow() throws Exception {
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/queries/TestWindowQuery/testLag1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestWindowQuery/testLag1.sql b/tajo-core/src/test/resources/queries/TestWindowQuery/testLag1.sql
new file mode 100644
index 0000000..1fd1e9e
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestWindowQuery/testLag1.sql
@@ -0,0 +1,18 @@
+SELECT
+  lag(l_shipmode, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipmode_lag,
+  lag(l_linenumber, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as linenumber_lag,
+  lag(l_suppkey_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as suppkey_lag,
+  lag(l_shipdate_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipdate_lag,
+  lag(l_commitdate_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as commitdate_lag,
+  lag(l_extendedprice, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as extendedprice_lag,
+  lag(l_discount_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as discount_lag,
+  l_orderkey
+FROM
+(
+  SELECT
+    l_orderkey,l_partkey,l_suppkey::INT8 as l_suppkey_t,l_linenumber,l_quantity,
+    l_extendedprice,l_discount::FLOAT4 as l_discount_t,l_tax,l_returnflag,l_linestatus,
+    l_shipdate::DATE as l_shipdate_t,l_commitdate::TIMESTAMP as l_commitdate_t,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
+  FROM
+    LINEITEM
+) xx
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithDefault.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithDefault.sql b/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithDefault.sql
new file mode 100644
index 0000000..3638498
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithDefault.sql
@@ -0,0 +1,18 @@
+SELECT
+  lag(l_shipmode, 1, 'default') over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipmode_lag,
+  lag(l_linenumber, 1, 100) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as linenumber_lag,
+  lag(l_suppkey_t, 1, 1000::int8) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as suppkey_lag,
+  lag(l_shipdate_t, 1, '15-01-01'::date) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipdate_lag,
+  lag(l_commitdate_t, 1, '15-01-01 12:00:00'::timestamp) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as commitdate_lag,
+  lag(l_extendedprice, 1, 1.234::float8) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as extendedprice_lag,
+  lag(l_discount_t, 1, 0.11::float4) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as discount_lag,
+  l_orderkey
+FROM
+(
+  SELECT
+    l_orderkey,l_partkey,l_suppkey::INT8 as l_suppkey_t,l_linenumber,l_quantity,
+    l_extendedprice,l_discount::FLOAT4 as l_discount_t,l_tax,l_returnflag,l_linestatus,
+    l_shipdate::DATE as l_shipdate_t,l_commitdate::TIMESTAMP as l_commitdate_t,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
+  FROM
+    LINEITEM
+) xx
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithNoArgs.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithNoArgs.sql b/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithNoArgs.sql
new file mode 100644
index 0000000..1d551d6
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestWindowQuery/testLagWithNoArgs.sql
@@ -0,0 +1,18 @@
+SELECT
+  lag(l_shipmode) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipmode_lag,
+  lag(l_linenumber) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as linenumber_lag,
+  lag(l_suppkey_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as suppkey_lag,
+  lag(l_shipdate_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipdate_lag,
+  lag(l_commitdate_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as commitdate_lag,
+  lag(l_extendedprice) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as extendedprice_lag,
+  lag(l_discount_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as discount_lag,
+  l_orderkey
+FROM
+(
+  SELECT
+    l_orderkey,l_partkey,l_suppkey::INT8 as l_suppkey_t,l_linenumber,l_quantity,
+    l_extendedprice,l_discount::FLOAT4 as l_discount_t,l_tax,l_returnflag,l_linestatus,
+    l_shipdate::DATE as l_shipdate_t,l_commitdate::TIMESTAMP as l_commitdate_t,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
+  FROM
+    LINEITEM
+) xx
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/queries/TestWindowQuery/testLead1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestWindowQuery/testLead1.sql b/tajo-core/src/test/resources/queries/TestWindowQuery/testLead1.sql
new file mode 100644
index 0000000..1c6cc20
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestWindowQuery/testLead1.sql
@@ -0,0 +1,18 @@
+SELECT
+  lead(l_shipmode, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipmode_lead,
+  lead(l_linenumber, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as linenumber_lead,
+  lead(l_suppkey_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as suppkey_lead,
+  lead(l_shipdate_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipdate_lead,
+  lead(l_commitdate_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as commitdate_lead,
+  lead(l_extendedprice, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as extendedprice_lead,
+  lead(l_discount_t, 1) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as discount_lead,
+  l_orderkey
+FROM
+(
+  SELECT
+    l_orderkey,l_partkey,l_suppkey::INT8 as l_suppkey_t,l_linenumber,l_quantity,
+    l_extendedprice,l_discount::FLOAT4 as l_discount_t,l_tax,l_returnflag,l_linestatus,
+    l_shipdate::DATE as l_shipdate_t,l_commitdate::TIMESTAMP as l_commitdate_t,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
+  FROM
+    LINEITEM
+) xx
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithDefault.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithDefault.sql b/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithDefault.sql
new file mode 100644
index 0000000..792d47f
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithDefault.sql
@@ -0,0 +1,18 @@
+SELECT
+  lead(l_shipmode, 1, 'default') over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipmode_lead,
+  lead(l_linenumber, 1, 100) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as linenumber_lead,
+  lead(l_suppkey_t, 1, 1000::int8) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as suppkey_lead,
+  lead(l_shipdate_t, 1, '15-01-01'::date) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipdate_lead,
+  lead(l_commitdate_t, 1, '15-01-01 12:00:00'::timestamp) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as commitdate_lead,
+  lead(l_extendedprice, 1, 1.234::float8) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as extendedprice_lead,
+  lead(l_discount_t, 1, 0.11::float4) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as discount_lead,
+  l_orderkey
+FROM
+(
+  SELECT
+    l_orderkey,l_partkey,l_suppkey::INT8 as l_suppkey_t,l_linenumber,l_quantity,
+    l_extendedprice,l_discount::FLOAT4 as l_discount_t,l_tax,l_returnflag,l_linestatus,
+    l_shipdate::DATE as l_shipdate_t,l_commitdate::TIMESTAMP as l_commitdate_t,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
+  FROM
+    LINEITEM
+) xx
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithNoArgs.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithNoArgs.sql b/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithNoArgs.sql
new file mode 100644
index 0000000..7f5f940
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestWindowQuery/testLeadWithNoArgs.sql
@@ -0,0 +1,18 @@
+SELECT
+  lead(l_shipmode) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipmode_lead,
+  lead(l_linenumber) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as linenumber_lead,
+  lead(l_suppkey_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as suppkey_lead,
+  lead(l_shipdate_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as shipdate_lead,
+  lead(l_commitdate_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as commitdate_lead,
+  lead(l_extendedprice) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as extendedprice_lead,
+  lead(l_discount_t) over (PARTITION BY L_ORDERKEY order by l_shipmode ) as discount_lead,
+  l_orderkey
+FROM
+(
+  SELECT
+    l_orderkey,l_partkey,l_suppkey::INT8 as l_suppkey_t,l_linenumber,l_quantity,
+    l_extendedprice,l_discount::FLOAT4 as l_discount_t,l_tax,l_returnflag,l_linestatus,
+    l_shipdate::DATE as l_shipdate_t,l_commitdate::TIMESTAMP as l_commitdate_t,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
+  FROM
+    LINEITEM
+) xx
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/results/TestWindowQuery/testLag1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testLag1.result b/tajo-core/src/test/resources/results/TestWindowQuery/testLag1.result
new file mode 100644
index 0000000..d64725d
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestWindowQuery/testLag1.result
@@ -0,0 +1,7 @@
+shipmode_lag,linenumber_lag,suppkey_lag,shipdate_lag,commitdate_lag,extendedprice_lag,discount_lag,l_orderkey
+-------------------------------
+null,null,null,null,null,null,null,1
+MAIL,2,7311,1996-04-12,1996-02-28 00:00:00,45983.16,0.09,1
+null,null,null,null,null,null,null,2
+null,null,null,null,null,null,null,3
+AIR,1,1798,1994-02-02,1994-01-04 00:00:00,54058.05,0.06,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithDefault.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithDefault.result b/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithDefault.result
new file mode 100644
index 0000000..2b10477
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithDefault.result
@@ -0,0 +1,7 @@
+shipmode_lag,linenumber_lag,suppkey_lag,shipdate_lag,commitdate_lag,extendedprice_lag,discount_lag,l_orderkey
+-------------------------------
+default,100,1000,2015-01-01,2015-01-01 12:00:00,1.234,0.11,1
+MAIL,2,7311,1996-04-12,1996-02-28 00:00:00,45983.16,0.09,1
+default,100,1000,2015-01-01,2015-01-01 12:00:00,1.234,0.11,2
+default,100,1000,2015-01-01,2015-01-01 12:00:00,1.234,0.11,3
+AIR,1,1798,1994-02-02,1994-01-04 00:00:00,54058.05,0.06,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithNoArgs.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithNoArgs.result b/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithNoArgs.result
new file mode 100644
index 0000000..d64725d
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestWindowQuery/testLagWithNoArgs.result
@@ -0,0 +1,7 @@
+shipmode_lag,linenumber_lag,suppkey_lag,shipdate_lag,commitdate_lag,extendedprice_lag,discount_lag,l_orderkey
+-------------------------------
+null,null,null,null,null,null,null,1
+MAIL,2,7311,1996-04-12,1996-02-28 00:00:00,45983.16,0.09,1
+null,null,null,null,null,null,null,2
+null,null,null,null,null,null,null,3
+AIR,1,1798,1994-02-02,1994-01-04 00:00:00,54058.05,0.06,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/results/TestWindowQuery/testLead1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testLead1.result b/tajo-core/src/test/resources/results/TestWindowQuery/testLead1.result
new file mode 100644
index 0000000..a85ead5
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestWindowQuery/testLead1.result
@@ -0,0 +1,7 @@
+shipmode_lead,linenumber_lead,suppkey_lead,shipdate_lead,commitdate_lead,extendedprice_lead,discount_lead,l_orderkey
+-------------------------------
+TRUCK,1,7706,1996-03-13,1996-02-12 00:00:00,21168.23,0.04,1
+null,null,null,null,null,null,null,1
+null,null,null,null,null,null,null,2
+RAIL,2,6540,1993-11-09,1993-12-20 00:00:00,46796.47,0.1,3
+null,null,null,null,null,null,null,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithDefault.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithDefault.result b/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithDefault.result
new file mode 100644
index 0000000..8298e44
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithDefault.result
@@ -0,0 +1,7 @@
+shipmode_lead,linenumber_lead,suppkey_lead,shipdate_lead,commitdate_lead,extendedprice_lead,discount_lead,l_orderkey
+-------------------------------
+TRUCK,1,7706,1996-03-13,1996-02-12 00:00:00,21168.23,0.04,1
+default,100,1000,2015-01-01,2015-01-01 12:00:00,1.234,0.11,1
+default,100,1000,2015-01-01,2015-01-01 12:00:00,1.234,0.11,2
+RAIL,2,6540,1993-11-09,1993-12-20 00:00:00,46796.47,0.1,3
+default,100,1000,2015-01-01,2015-01-01 12:00:00,1.234,0.11,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithNoArgs.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithNoArgs.result b/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithNoArgs.result
new file mode 100644
index 0000000..a85ead5
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestWindowQuery/testLeadWithNoArgs.result
@@ -0,0 +1,7 @@
+shipmode_lead,linenumber_lead,suppkey_lead,shipdate_lead,commitdate_lead,extendedprice_lead,discount_lead,l_orderkey
+-------------------------------
+TRUCK,1,7706,1996-03-13,1996-02-12 00:00:00,21168.23,0.04,1
+null,null,null,null,null,null,null,1
+null,null,null,null,null,null,null,2
+RAIL,2,6540,1993-11-09,1993-12-20 00:00:00,46796.47,0.1,3
+null,null,null,null,null,null,null,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
index c8eaffc..235bebf 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/ExprAnnotator.java
@@ -679,7 +679,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
   }
 
   public static final Set<String> WINDOW_FUNCTIONS =
-      Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist", "first_value");
+      Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist", "first_value", "lag");
 
   public EvalNode visitWindowFunction(Context ctx, Stack<Expr> stack, WindowFunctionExpr windowFunc)
       throws PlanningException {
@@ -721,6 +721,10 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
       } else {
         paramTypes[0] = givenArgs[0].getValueType();
       }
+      for (int i = 1; i < params.length; i++) {
+        givenArgs[i] = visit(ctx, stack, params[i]);
+        paramTypes[i] = givenArgs[i].getValueType();
+      }
     } else {
       if (windowFunc.getSignature().equalsIgnoreCase("rank")) {
         givenArgs = sortKeys != null ? sortKeys : new EvalNode[0];

http://git-wip-us.apache.org/repos/asf/tajo/blob/901700b2/tajo-plan/src/main/java/org/apache/tajo/plan/TypeDeterminant.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/TypeDeterminant.java b/tajo-plan/src/main/java/org/apache/tajo/plan/TypeDeterminant.java
index 8605b3d..6222734 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/TypeDeterminant.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/TypeDeterminant.java
@@ -235,6 +235,10 @@ public class TypeDeterminant extends SimpleAlgebraVisitor<LogicalPlanner.PlanCon
       } else {
         paramTypes[0] = givenArgs[0];
       }
+      for (int i = 1; i < params.length; i++) {
+        givenArgs[i] = visit(ctx, stack, params[i]);
+        paramTypes[i] = givenArgs[i];
+      }
     }
     stack.pop(); // <--- Pop