You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Markus Holzemer (JIRA)" <ji...@apache.org> on 2014/07/10 12:06:05 UTC
[jira] [Created] (FLINK-1018) Logistic Regression deadlocks
Markus Holzemer created FLINK-1018:
--------------------------------------
Summary: Logistic Regression deadlocks
Key: FLINK-1018
URL: https://issues.apache.org/jira/browse/FLINK-1018
Project: Flink
Issue Type: Bug
Reporter: Markus Holzemer
Attachments: LogisticRegression.java
We are currently running our implementation of logistic regression with batch gradient descent on the cluster.
Unfortunatelly for datasets > 1GB it seems to deadlock inside of the iteration. This means the first iteration is never finished.
The iteration does a map over all points, the map gets the iteration input as broadcast variable. The result of the map is reduced and the result of the reducer (1 tuple) is crossed with the iteration input.
There should be no reason for the deadlock, since the data is still quite small compared to the cluster size (4 nodes a 32GB). Also the datasize stays constant throughout the algorithm.
Here is the generated plan. I will also attach the full algorithm.
{code}
{
"nodes": [
{
"id": 2,
"type": "source",
"pact": "Data Source",
"contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.",
"parallelism": "1",
"subtasks_per_instance": "1",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "0.0 B" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"step_function": [
{
"id": 7,
"type": "source",
"pact": "Data Source",
"contents": "TextInputFormat (D:/Devel/HIGGS-0.0001.csv) - UTF-8",
"parallelism": "2",
"subtasks_per_instance": "2",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "83.27 MB" },
{ "name": "Est. Cardinality", "value": "113.9. K" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "83.27 MB" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "83.27 MB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 6,
"type": "pact",
"pact": "Map",
"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6",
"parallelism": "2",
"subtasks_per_instance": "2",
"predecessors": [
{"id": 7, "ship_strategy": "Forward"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "113.9. K" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "83.27 MB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 9,
"type": "pact",
"pact": "Map",
"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1",
"parallelism": "2",
"subtasks_per_instance": "2",
"predecessors": [
{"id": 6, "ship_strategy": "Forward"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "113.9. K" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "41.63 MB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 8,
"type": "pact",
"pact": "Reduce",
"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
"parallelism": "1",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 9, "ship_strategy": "Forward"}
],
"driver_strategy": "Reduce All",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "41.63 MB" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 10,
"type": "pact",
"pact": "Bulk Partial Solution",
"contents": "Partial Solution",
"parallelism": "2",
"subtasks_per_instance": "2",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "0.0 B" },
{ "name": "Cumulative CPU", "value": "0.0 " }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 5,
"type": "pact",
"pact": "Map",
"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3",
"parallelism": "2",
"subtasks_per_instance": "2",
"predecessors": [
{"id": 6, "side": "first", "ship_strategy": "Forward", "temp_mode": "CACHED"},
{"id": 8, "side": "second", "ship_strategy": "Broadcast"},
{"id": 10, "side": "second", "ship_strategy": "Broadcast"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "113.9. K" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 4,
"type": "pact",
"pact": "Reduce",
"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
"parallelism": "1",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 5, "ship_strategy": "Forward"}
],
"driver_strategy": "Reduce All",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "0.0 B" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 3,
"type": "pact",
"pact": "Cross",
"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5",
"parallelism": "2",
"subtasks_per_instance": "2",
"predecessors": [
{"id": 4, "side": "first", "ship_strategy": "Forward"},
{"id": 10, "side": "second", "ship_strategy": "Broadcast", "temp_mode": "PIPELINE_BREAKER"}
],
"driver_strategy": "Nested Loops (Blocked Outer: de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
}
],
"partial_solution": 10,
"next_partial_solution": 3,
"id": 1,
"type": "bulk_iteration",
"pact": "Bulk Iteration",
"contents": "Bulk Iteration",
"parallelism": "2",
"subtasks_per_instance": "2",
"predecessors": [
{"id": 2, "ship_strategy": "Forward"}
],
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 0,
"type": "sink",
"pact": "Data Sink",
"contents": "TextOutputFormat (D:/Devel/theta) - UTF-8",
"parallelism": "2",
"subtasks_per_instance": "2",
"predecessors": [
{"id": 1, "ship_strategy": "Forward"}
],
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0 B" },
{ "name": "Disk I/O", "value": "0.0 B" },
{ "name": "CPU", "value": "0.0 " },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
}
]
}
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)